[GCS]Fix ServiceBasedGcsClientTest bug (#11031)

Co-authored-by: 灵洵 <fengbin.ffb@antfin.com>
This commit is contained in:
fangfengbin
2020-09-28 14:50:36 +08:00
committed by GitHub
parent 86e5db4d59
commit 142234cbcb
7 changed files with 104 additions and 4 deletions
+24
View File
@@ -173,6 +173,12 @@ class ActorInfoAccessor {
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
/// Check if the specified actor is unsubscribed.
///
/// \param actor_id The ID of the actor.
/// \return Whether the specified actor is unsubscribed.
virtual bool IsActorUnsubscribed(const ActorID &actor_id) = 0;
protected:
ActorInfoAccessor() = default;
};
@@ -337,6 +343,18 @@ class TaskInfoAccessor {
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
/// Check if the specified task is unsubscribed.
///
/// \param task_id The ID of the task.
/// \return Whether the specified task is unsubscribed.
virtual bool IsTaskUnsubscribed(const TaskID &task_id) = 0;
/// Check if the specified task lease is unsubscribed.
///
/// \param task_id The ID of the task.
/// \return Whether the specified task lease is unsubscribed.
virtual bool IsTaskLeaseUnsubscribed(const TaskID &task_id) = 0;
protected:
TaskInfoAccessor() = default;
};
@@ -409,6 +427,12 @@ class ObjectInfoAccessor {
/// \param is_pubsub_server_restarted Whether pubsub server is restarted.
virtual void AsyncResubscribe(bool is_pubsub_server_restarted) = 0;
/// Check if the specified object is unsubscribed.
///
/// \param object_id The ID of the object.
/// \return Whether the specified object is unsubscribed.
virtual bool IsObjectUnsubscribed(const ObjectID &object_id) = 0;
protected:
ObjectInfoAccessor() = default;
};
@@ -452,6 +452,10 @@ void ServiceBasedActorInfoAccessor::AsyncResubscribe(bool is_pubsub_server_resta
}
}
bool ServiceBasedActorInfoAccessor::IsActorUnsubscribed(const ActorID &actor_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(ACTOR_CHANNEL, actor_id.Hex());
}
ServiceBasedNodeInfoAccessor::ServiceBasedNodeInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
@@ -1106,6 +1110,14 @@ void ServiceBasedTaskInfoAccessor::AsyncResubscribe(bool is_pubsub_server_restar
}
}
bool ServiceBasedTaskInfoAccessor::IsTaskUnsubscribed(const TaskID &task_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_CHANNEL, task_id.Hex());
}
bool ServiceBasedTaskInfoAccessor::IsTaskLeaseUnsubscribed(const TaskID &task_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(TASK_LEASE_CHANNEL, task_id.Hex());
}
ServiceBasedObjectInfoAccessor::ServiceBasedObjectInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
@@ -1290,6 +1302,10 @@ Status ServiceBasedObjectInfoAccessor::AsyncUnsubscribeToLocations(
return status;
}
bool ServiceBasedObjectInfoAccessor::IsObjectUnsubscribed(const ObjectID &object_id) {
return client_impl_->GetGcsPubSub().IsUnsubscribed(OBJECT_CHANNEL, object_id.Hex());
}
ServiceBasedStatsInfoAccessor::ServiceBasedStatsInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl) {}
@@ -118,6 +118,8 @@ class ServiceBasedActorInfoAccessor : public ActorInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override;
bool IsActorUnsubscribed(const ActorID &actor_id) override;
private:
/// Save the subscribe operation in this function, so we can call it again when PubSub
/// server restarts from a failure.
@@ -299,6 +301,10 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override;
bool IsTaskUnsubscribed(const TaskID &task_id) override;
bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override;
private:
/// Save the subscribe operations, so we can call them again when PubSub
/// server restarts from a failure.
@@ -343,6 +349,8 @@ class ServiceBasedObjectInfoAccessor : public ObjectInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override;
bool IsObjectUnsubscribed(const ObjectID &object_id) override;
private:
// Mutex to protect the subscribe_object_operations_ field and
// fetch_object_data_operations_ field.
@@ -142,6 +142,13 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(actor_id));
}
void WaitForActorUnsubscribed(const ActorID &actor_id) {
auto condition = [this, actor_id]() {
return gcs_client_->Actors().IsActorUnsubscribed(actor_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
bool SubscribeAllActors(
const gcs::SubscribeCallback<ActorID, rpc::ActorTableData> &subscribe) {
std::promise<bool> promise;
@@ -335,10 +342,23 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
void UnsubscribeTask(const TaskID &task_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id));
}
void WaitForTaskUnsubscribed(const TaskID &task_id) {
auto condition = [this, task_id]() {
return gcs_client_->Tasks().IsTaskUnsubscribed(task_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
void WaitForTaskLeaseUnsubscribed(const TaskID &task_id) {
auto condition = [this, task_id]() {
return gcs_client_->Tasks().IsTaskLeaseUnsubscribed(task_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
bool AddTask(const std::shared_ptr<rpc::TaskTableData> task) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(
@@ -380,7 +400,6 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
void UnsubscribeTaskLease(const TaskID &task_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id));
}
@@ -411,10 +430,16 @@ class ServiceBasedGcsClientTest : public ::testing::Test {
}
void UnsubscribeToLocations(const ObjectID &object_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncUnsubscribeToLocations(object_id));
}
void WaitForObjectUnsubscribed(const ObjectID &object_id) {
auto condition = [this, object_id]() {
return gcs_client_->Objects().IsObjectUnsubscribed(object_id);
};
EXPECT_TRUE(WaitForCondition(condition, timeout_ms_.count()));
}
bool AddLocation(const ObjectID &object_id, const NodeID &node_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Objects().AsyncAddLocation(
@@ -545,6 +570,7 @@ TEST_F(ServiceBasedGcsClientTest, TestActorInfo) {
// Cancel subscription to an actor.
UnsubscribeActor(actor_id);
WaitForActorUnsubscribed(actor_id);
// Update dynamic states of actor in GCS.
actor_table_data->set_state(rpc::ActorTableData::DEAD);
@@ -725,6 +751,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) {
// Cancel subscription to a task.
UnsubscribeTask(task_id);
WaitForTaskUnsubscribed(task_id);
// Add a task to GCS again.
ASSERT_TRUE(AddTask(task_table_data));
@@ -754,6 +781,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) {
// Cancel subscription to a task lease.
UnsubscribeTaskLease(task_id);
WaitForTaskLeaseUnsubscribed(task_id);
// Add a task lease to GCS again.
ASSERT_TRUE(AddTaskLease(task_lease));
@@ -805,6 +833,7 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectInfo) {
// Cancel subscription to any update of an object's location.
UnsubscribeToLocations(object_id);
WaitForObjectUnsubscribed(object_id);
// Add location of object to GCS again.
ASSERT_TRUE(AddLocation(object_id, node_id));
@@ -965,7 +994,7 @@ TEST_F(ServiceBasedGcsClientTest, TestObjectTableResubscribe) {
// Cancel subscription to any update of an object's location.
UnsubscribeToLocations(object1_id);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
WaitForObjectUnsubscribed(object1_id);
// Restart GCS.
RestartGcsServer();
@@ -1062,6 +1091,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskTableResubscribe) {
WaitForExpectedCount(task_count, 1);
WaitForExpectedCount(task_lease_count, 1);
UnsubscribeTask(task_id);
WaitForTaskUnsubscribed(task_id);
RestartGcsServer();
+7
View File
@@ -174,5 +174,12 @@ std::string GcsPubSub::GenChannelPattern(const std::string &channel,
return pattern.str();
}
bool GcsPubSub::IsUnsubscribed(const std::string &channel, const std::string &id) {
std::string pattern = GenChannelPattern(channel, id);
absl::MutexLock lock(&mutex_);
return !channels_.contains(pattern);
}
} // namespace gcs
} // namespace ray
+7
View File
@@ -88,6 +88,13 @@ class GcsPubSub {
/// \return Status
Status Unsubscribe(const std::string &channel, const std::string &id);
/// Check if the specified ID under the specified channel is unsubscribed.
///
/// \param channel The channel to unsubscribe from redis.
/// \param id The id of message to be unsubscribed from redis.
/// \return Whether the specified ID under the specified channel is unsubscribed.
bool IsUnsubscribed(const std::string &channel, const std::string &id);
private:
/// Represents a caller's command to subscribe or unsubscribe to a given
/// channel.
+8
View File
@@ -87,6 +87,8 @@ class RedisLogBasedActorInfoAccessor : public ActorInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
bool IsActorUnsubscribed(const ActorID &actor_id) override { return false; }
protected:
virtual std::vector<ActorID> GetAllActorID() const;
virtual Status Get(const ActorID &actor_id, ActorTableData *actor_table_data) const;
@@ -244,6 +246,10 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
bool IsTaskUnsubscribed(const TaskID &task_id) override { return false; }
bool IsTaskLeaseUnsubscribed(const TaskID &task_id) override { return false; }
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random NodeID for task subscription. Because:
@@ -295,6 +301,8 @@ class RedisObjectInfoAccessor : public ObjectInfoAccessor {
void AsyncResubscribe(bool is_pubsub_server_restarted) override {}
bool IsObjectUnsubscribed(const ObjectID &object_id) override { return false; }
private:
RedisGcsClient *client_impl_{nullptr};