From 142234cbcb8edb09f34df68d37363d2c8bdb6598 Mon Sep 17 00:00:00 2001 From: fangfengbin <869218239a@zju.edu.cn> Date: Mon, 28 Sep 2020 14:50:36 +0800 Subject: [PATCH] [GCS]Fix ServiceBasedGcsClientTest bug (#11031) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: 灵洵 --- src/ray/gcs/accessor.h | 24 ++++++++++++ .../gcs/gcs_client/service_based_accessor.cc | 16 ++++++++ .../gcs/gcs_client/service_based_accessor.h | 8 ++++ .../test/service_based_gcs_client_test.cc | 38 +++++++++++++++++-- src/ray/gcs/pubsub/gcs_pub_sub.cc | 7 ++++ src/ray/gcs/pubsub/gcs_pub_sub.h | 7 ++++ src/ray/gcs/redis_accessor.h | 8 ++++ 7 files changed, 104 insertions(+), 4 deletions(-) diff --git a/src/ray/gcs/accessor.h b/src/ray/gcs/accessor.h index 1e29a5f53..5f36dd79a 100644 --- a/src/ray/gcs/accessor.h +++ b/src/ray/gcs/accessor.h @@ -173,6 +173,12 @@ class ActorInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + /// Check if the specified actor is unsubscribed. + /// + /// \param actor_id The ID of the actor. + /// \return Whether the specified actor is unsubscribed. + virtual bool IsActorUnsubscribed(const ActorID &actor_id) = 0; + protected: ActorInfoAccessor() = default; }; @@ -337,6 +343,18 @@ class TaskInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + /// Check if the specified task is unsubscribed. + /// + /// \param task_id The ID of the task. + /// \return Whether the specified task is unsubscribed. + virtual bool IsTaskUnsubscribed(const TaskID &task_id) = 0; + + /// Check if the specified task lease is unsubscribed. + /// + /// \param task_id The ID of the task. + /// \return Whether the specified task lease is unsubscribed. + virtual bool IsTaskLeaseUnsubscribed(const TaskID &task_id) = 0; + protected: TaskInfoAccessor() = default; }; @@ -409,6 +427,12 @@ class ObjectInfoAccessor { /// \param is_pubsub_server_restarted Whether pubsub server is restarted. virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0; + /// Check if the specified object is unsubscribed. + /// + /// \param object_id The ID of the object. + /// \return Whether the specified object is unsubscribed. + virtual bool IsObjectUnsubscribed(const ObjectID &object_id) = 0; + protected: ObjectInfoAccessor() = default; }; diff --git a/src/ray/gcs/gcs_client/service_based_accessor.cc b/src/ray/gcs/gcs_client/service_based_accessor.cc index d1f77cf02..07c4d6400 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.cc +++ b/src/ray/gcs/gcs_client/service_based_accessor.cc @@ -452,6 +452,10 @@ void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_resta } } +bool ServiceBasedActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) { + return client_impl_->GetGcsPubSub().IsUnsubscribed(ACTOR_CHANNEL, actor_id.Hex()); +} + ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} @@ -1106,6 +1110,14 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar } } +bool ServiceBasedTaskInfoAccessor::IsTaskUnsubscribed(const TaskID &task_id) { + return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_CHANNEL, task_id.Hex()); +} + +bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) { + return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_LEASE_CHANNEL, task_id.Hex()); +} + ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} @@ -1290,6 +1302,10 @@ Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations( return status; } +bool ServiceBasedObjectInfoAccessor::IsObjectUnsubscribed(const ObjectID &object_id) { + return client_impl_->GetGcsPubSub().IsUnsubscribed(OBJECT_CHANNEL, object_id.Hex()); +} + ServiceBasedStatsInfoAccessor::ServiceBasedStatsInfoAccessor( ServiceBasedGcsClient *client_impl) : client_impl_(client_impl) {} diff --git a/src/ray/gcs/gcs_client/service_based_accessor.h b/src/ray/gcs/gcs_client/service_based_accessor.h index 05d5b2788..fcd8e1e20 100644 --- a/src/ray/gcs/gcs_client/service_based_accessor.h +++ b/src/ray/gcs/gcs_client/service_based_accessor.h @@ -118,6 +118,8 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override; + bool IsActorUnsubscribed(const ActorID &actor_id) override; + private: /// Save the subscribe operation in this function, so we can call it again when PubSub /// server restarts from a failure. @@ -299,6 +301,10 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override; + bool IsTaskUnsubscribed(const TaskID &task_id) override; + + bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override; + private: /// Save the subscribe operations, so we can call them again when PubSub /// server restarts from a failure. @@ -343,6 +349,8 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override; + bool IsObjectUnsubscribed(const ObjectID &object_id) override; + private: // Mutex to protect the subscribe_object_operations_ field and // fetch_object_data_operations_ field. diff --git a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc index 310361ad7..6456212bb 100644 --- a/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc +++ b/src/ray/gcs/gcs_client/test/service_based_gcs_client_test.cc @@ -142,6 +142,13 @@ class ServiceBasedGcsClientTest : public ::testing::Test { RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id)); } + void WaitForActorUnsubscribed(const ActorID &actor_id) { + auto condition = [this, actor_id]() { + return gcs_client_->Actors().IsActorUnsubscribed(actor_id); + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + bool SubscribeAllActors( const gcs::SubscribeCallback &subscribe) { std::promise promise; @@ -335,10 +342,23 @@ class ServiceBasedGcsClientTest : public ::testing::Test { } void UnsubscribeTask(const TaskID &task_id) { - std::promise promise; RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id)); } + void WaitForTaskUnsubscribed(const TaskID &task_id) { + auto condition = [this, task_id]() { + return gcs_client_->Tasks().IsTaskUnsubscribed(task_id); + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + + void WaitForTaskLeaseUnsubscribed(const TaskID &task_id) { + auto condition = [this, task_id]() { + return gcs_client_->Tasks().IsTaskLeaseUnsubscribed(task_id); + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + bool AddTask(const std::shared_ptr task) { std::promise promise; RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd( @@ -380,7 +400,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test { } void UnsubscribeTaskLease(const TaskID &task_id) { - std::promise promise; RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id)); } @@ -411,10 +430,16 @@ class ServiceBasedGcsClientTest : public ::testing::Test { } void UnsubscribeToLocations(const ObjectID &object_id) { - std::promise promise; RAY_CHECK_OK(gcs_client_->Objects().AsyncUnsubscribeToLocations(object_id)); } + void WaitForObjectUnsubscribed(const ObjectID &object_id) { + auto condition = [this, object_id]() { + return gcs_client_->Objects().IsObjectUnsubscribed(object_id); + }; + EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count())); + } + bool AddLocation(const ObjectID &object_id, const NodeID &node_id) { std::promise promise; RAY_CHECK_OK(gcs_client_->Objects().AsyncAddLocation( @@ -545,6 +570,7 @@ TEST_F(ServiceBasedGcsClientTest, TestActorInfo) { // Cancel subscription to an actor. UnsubscribeActor(actor_id); + WaitForActorUnsubscribed(actor_id); // Update dynamic states of actor in GCS. actor_table_data->set_state(rpc::ActorTableData::DEAD); @@ -725,6 +751,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { // Cancel subscription to a task. UnsubscribeTask(task_id); + WaitForTaskUnsubscribed(task_id); // Add a task to GCS again. ASSERT_TRUE(AddTask(task_table_data)); @@ -754,6 +781,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) { // Cancel subscription to a task lease. UnsubscribeTaskLease(task_id); + WaitForTaskLeaseUnsubscribed(task_id); // Add a task lease to GCS again. ASSERT_TRUE(AddTaskLease(task_lease)); @@ -805,6 +833,7 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectInfo) { // Cancel subscription to any update of an object's location. UnsubscribeToLocations(object_id); + WaitForObjectUnsubscribed(object_id); // Add location of object to GCS again. ASSERT_TRUE(AddLocation(object_id, node_id)); @@ -965,7 +994,7 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) { // Cancel subscription to any update of an object's location. UnsubscribeToLocations(object1_id); - std::this_thread::sleep_for(std::chrono::milliseconds(100)); + WaitForObjectUnsubscribed(object1_id); // Restart GCS. RestartGcsServer(); @@ -1062,6 +1091,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) { WaitForExpectedCount(task_count, 1); WaitForExpectedCount(task_lease_count, 1); UnsubscribeTask(task_id); + WaitForTaskUnsubscribed(task_id); RestartGcsServer(); diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.cc b/src/ray/gcs/pubsub/gcs_pub_sub.cc index 62a28dea7..6ea273c0d 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.cc +++ b/src/ray/gcs/pubsub/gcs_pub_sub.cc @@ -174,5 +174,12 @@ std::string GcsPubSub::GenChannelPattern(const std::string &channel, return pattern.str(); } +bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id) { + std::string pattern = GenChannelPattern(channel, id); + + absl::MutexLock lock(&mutex_); + return !channels_.contains(pattern); +} + } // namespace gcs } // namespace ray diff --git a/src/ray/gcs/pubsub/gcs_pub_sub.h b/src/ray/gcs/pubsub/gcs_pub_sub.h index 4ac1a6256..8ac06c9bf 100644 --- a/src/ray/gcs/pubsub/gcs_pub_sub.h +++ b/src/ray/gcs/pubsub/gcs_pub_sub.h @@ -88,6 +88,13 @@ class GcsPubSub { /// \return Status Status Unsubscribe(const std::string &channel, const std::string &id); + /// Check if the specified ID under the specified channel is unsubscribed. + /// + /// \param channel The channel to unsubscribe from redis. + /// \param id The id of message to be unsubscribed from redis. + /// \return Whether the specified ID under the specified channel is unsubscribed. + bool IsUnsubscribed(const std::string &channel, const std::string &id); + private: /// Represents a caller's command to subscribe or unsubscribe to a given /// channel. diff --git a/src/ray/gcs/redis_accessor.h b/src/ray/gcs/redis_accessor.h index b269bfd48..e94b152fd 100644 --- a/src/ray/gcs/redis_accessor.h +++ b/src/ray/gcs/redis_accessor.h @@ -87,6 +87,8 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override {} + bool IsActorUnsubscribed(const ActorID &actor_id) override { return false; } + protected: virtual std::vector GetAllActorID() const; virtual Status Get(const ActorID &actor_id, ActorTableData *actor_table_data) const; @@ -244,6 +246,10 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override {} + bool IsTaskUnsubscribed(const TaskID &task_id) override { return false; } + + bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override { return false; } + private: RedisGcsClient *client_impl_{nullptr}; // Use a random NodeID for task subscription. Because: @@ -295,6 +301,8 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor { void AsyncResubscribe(bool is_pubsub_server_restarted) override {} + bool IsObjectUnsubscribed(const ObjectID &object_id) override { return false; } + private: RedisGcsClient *client_impl_{nullptr};