GCS adapts to task table pub sub (#8210)

This commit is contained in:
fangfengbin
2020-05-04 10:23:55 +08:00
committed by GitHub
parent e24276e3c1
commit 5f351a05fe
14 changed files with 77 additions and 67 deletions
+2 -5
View File
@@ -229,9 +229,8 @@ class TaskInfoAccessor {
/// Cancel subscription to a task asynchronously.
///
/// \param task_id The ID of the task to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) = 0;
virtual Status AsyncUnsubscribe(const TaskID &task_id) = 0;
/// Add a task lease to GCS asynchronously.
///
@@ -257,10 +256,8 @@ class TaskInfoAccessor {
/// Cancel subscription to a task lease asynchronously.
///
/// \param task_id The ID of the task to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id,
const StatusCallback &done) = 0;
virtual Status AsyncUnsubscribeTaskLease(const TaskID &task_id) = 0;
/// Attempt task reconstruction to GCS asynchronously.
///
@@ -571,7 +571,6 @@ ServiceBasedTaskInfoAccessor::ServiceBasedTaskInfoAccessor(
ServiceBasedGcsClient *client_impl)
: client_impl_(client_impl),
subscribe_id_(ClientID::FromRandom()),
task_sub_executor_(client_impl->GetRedisGcsClient().raylet_task_table()),
task_lease_sub_executor_(client_impl->GetRedisGcsClient().task_lease_table()) {}
Status ServiceBasedTaskInfoAccessor::AsyncAdd(
@@ -601,8 +600,7 @@ Status ServiceBasedTaskInfoAccessor::AsyncGet(
client_impl_->GetGcsRpcClient().GetTask(
request, [task_id, callback](const Status &status, const rpc::GetTaskReply &reply) {
if (reply.has_task_data()) {
TaskTableData task_table_data(reply.task_data());
callback(status, task_table_data);
callback(status, reply.task_data());
} else {
callback(status, boost::none);
}
@@ -636,16 +634,38 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribe(
const StatusCallback &done) {
RAY_LOG(DEBUG) << "Subscribing task, task id = " << task_id;
RAY_CHECK(subscribe != nullptr) << "Failed to subscribe task, task id = " << task_id;
auto status =
task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done);
auto on_subscribe = [task_id, subscribe](const std::string &id,
const std::string &data) {
TaskTableData task_data;
task_data.ParseFromString(data);
subscribe(task_id, task_data);
};
auto on_done = [this, task_id, subscribe, done](const Status &status) {
if (status.ok()) {
auto callback = [task_id, subscribe, done](
const Status &status,
const boost::optional<rpc::TaskTableData> &result) {
if (result) {
subscribe(task_id, *result);
}
if (done) {
done(status);
}
};
RAY_CHECK_OK(AsyncGet(task_id, callback));
} else if (done) {
done(status);
}
};
auto status = client_impl_->GetGcsPubSub().Subscribe(TASK_CHANNEL, task_id.Hex(),
on_subscribe, on_done);
RAY_LOG(DEBUG) << "Finished subscribing task, task id = " << task_id;
return status;
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id,
const StatusCallback &done) {
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task, task id = " << task_id;
auto status = task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done);
auto status = client_impl_->GetGcsPubSub().Unsubscribe(TASK_CHANNEL, task_id.Hex());
RAY_LOG(DEBUG) << "Finished unsubscribing task, task id = " << task_id;
return status;
}
@@ -683,10 +703,10 @@ Status ServiceBasedTaskInfoAccessor::AsyncSubscribeTaskLease(
return status;
}
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(
const TaskID &task_id, const StatusCallback &done) {
Status ServiceBasedTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
RAY_LOG(DEBUG) << "Unsubscribing task lease, task id = " << task_id;
auto status = task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done);
auto status =
task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr);
RAY_LOG(DEBUG) << "Finished unsubscribing task lease, task id = " << task_id;
return status;
}
@@ -212,7 +212,7 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
const SubscribeCallback<TaskID, rpc::TaskTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) override;
Status AsyncUnsubscribe(const TaskID &task_id) override;
Status AsyncAddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> &data_ptr,
const StatusCallback &callback) override;
@@ -222,8 +222,7 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
const SubscribeCallback<TaskID, boost::optional<rpc::TaskLeaseData>> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribeTaskLease(const TaskID &task_id,
const StatusCallback &done) override;
Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override;
Status AttemptTaskReconstruction(
const std::shared_ptr<rpc::TaskReconstructionData> &data_ptr,
@@ -234,10 +233,6 @@ class ServiceBasedTaskInfoAccessor : public TaskInfoAccessor {
ClientID subscribe_id_;
typedef SubscriptionExecutor<TaskID, TaskTableData, raylet::TaskTable>
TaskSubscriptionExecutor;
TaskSubscriptionExecutor task_sub_executor_;
typedef SubscriptionExecutor<TaskID, boost::optional<TaskLeaseData>, TaskLeaseTable>
TaskLeaseSubscriptionExecutor;
TaskLeaseSubscriptionExecutor task_lease_sub_executor_;
@@ -293,11 +293,9 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool UnsubscribeTask(const TaskID &task_id) {
void UnsubscribeTask(const TaskID &task_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(
task_id, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id));
}
bool AddTask(const std::shared_ptr<rpc::TaskTableData> task) {
@@ -340,11 +338,9 @@ class ServiceBasedGcsClientTest : public RedisServiceManagerForTest {
return WaitReady(promise.get_future(), timeout_ms_);
}
bool UnsubscribeTaskLease(const TaskID &task_id) {
void UnsubscribeTaskLease(const TaskID &task_id) {
std::promise<bool> promise;
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(
task_id, [&promise](Status status) { promise.set_value(status.ok()); }));
return WaitReady(promise.get_future(), timeout_ms_);
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id));
}
bool AddTaskLease(const std::shared_ptr<rpc::TaskLeaseData> task_lease) {
@@ -688,7 +684,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) {
ASSERT_TRUE(get_task_result.task().task_spec().job_id() == job_id.Binary());
// Cancel subscription to a task.
ASSERT_TRUE(UnsubscribeTask(task_id));
UnsubscribeTask(task_id);
// Add a task to GCS again.
ASSERT_TRUE(AddTask(task_table_data));
@@ -717,7 +713,7 @@ TEST_F(ServiceBasedGcsClientTest, TestTaskInfo) {
WaitPendingDone(task_lease_count, 2);
// Cancel subscription to a task lease.
ASSERT_TRUE(UnsubscribeTaskLease(task_id));
UnsubscribeTaskLease(task_id);
// Add a task lease to GCS again.
ASSERT_TRUE(AddTaskLease(task_lease));
+1 -1
View File
@@ -167,7 +167,7 @@ void GcsServer::StoreGcsServerAddressInRedis() {
std::unique_ptr<rpc::TaskInfoHandler> GcsServer::InitTaskInfoHandler() {
return std::unique_ptr<rpc::DefaultTaskInfoHandler>(
new rpc::DefaultTaskInfoHandler(*redis_gcs_client_));
new rpc::DefaultTaskInfoHandler(*redis_gcs_client_, gcs_pub_sub_));
}
std::unique_ptr<rpc::StatsHandler> GcsServer::InitStatsHandler() {
@@ -25,20 +25,24 @@ void DefaultTaskInfoHandler::HandleAddTask(const AddTaskRequest &request,
RAY_LOG(DEBUG) << "Adding task, job id = " << job_id << ", task id = " << task_id;
auto task_table_data = std::make_shared<TaskTableData>();
task_table_data->CopyFrom(request.task_data());
auto on_done = [job_id, task_id, request, reply, send_reply_callback](Status status) {
auto on_done = [this, job_id, task_id, task_table_data, request, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to add task, job id = " << job_id
<< ", task id = " << task_id;
} else {
RAY_CHECK_OK(gcs_pub_sub_->Publish(TASK_CHANNEL, task_id.Hex(),
task_table_data->SerializeAsString(), nullptr));
RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id
<< ", task id = " << task_id;
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
Status status = gcs_client_.Tasks().AsyncAdd(task_table_data, on_done);
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished adding task, job id = " << job_id
<< ", task id = " << task_id;
}
void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
@@ -48,23 +52,20 @@ void DefaultTaskInfoHandler::HandleGetTask(const GetTaskRequest &request,
RAY_LOG(DEBUG) << "Getting task, job id = " << task_id.JobId()
<< ", task id = " << task_id;
auto on_done = [task_id, request, reply, send_reply_callback](
Status status, const boost::optional<TaskTableData> &result) {
const Status &status, const boost::optional<TaskTableData> &result) {
if (status.ok()) {
RAY_DCHECK(result);
reply->mutable_task_data()->CopyFrom(*result);
} else {
RAY_LOG(ERROR) << "Failed to get task, job id = " << task_id.JobId()
<< ", task id = " << task_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", status = " << status.ToString();
GCS_RPC_SEND_REPLY(send_reply_callback, reply, Status::OK());
};
Status status = gcs_client_.Tasks().AsyncGet(task_id, on_done);
if (!status.ok()) {
on_done(status, boost::none);
}
RAY_LOG(DEBUG) << "Finished getting task, job id = " << task_id.JobId()
<< ", task id = " << task_id;
}
void DefaultTaskInfoHandler::HandleDeleteTasks(const DeleteTasksRequest &request,
@@ -127,12 +128,18 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
<< ", node id = " << node_id;
auto task_reconstruction_data = std::make_shared<TaskReconstructionData>();
task_reconstruction_data->CopyFrom(request.task_reconstruction());
auto on_done = [task_id, node_id, request, reply, send_reply_callback](Status status) {
auto on_done = [task_id, node_id, request, reply,
send_reply_callback](const Status &status) {
if (!status.ok()) {
RAY_LOG(ERROR) << "Failed to reconstruct task, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
} else {
RAY_LOG(DEBUG) << "Finished reconstructing task, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
}
GCS_RPC_SEND_REPLY(send_reply_callback, reply, status);
};
@@ -142,10 +149,6 @@ void DefaultTaskInfoHandler::HandleAttemptTaskReconstruction(
if (!status.ok()) {
on_done(status);
}
RAY_LOG(DEBUG) << "Finished reconstructing task, job id = " << task_id.JobId()
<< ", task id = " << task_id << ", reconstructions num = "
<< request.task_reconstruction().num_reconstructions()
<< ", node id = " << node_id;
}
} // namespace rpc
@@ -15,6 +15,7 @@
#ifndef RAY_GCS_TASK_INFO_HANDLER_IMPL_H
#define RAY_GCS_TASK_INFO_HANDLER_IMPL_H
#include "ray/gcs/pubsub/gcs_pub_sub.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/rpc/gcs_server/gcs_rpc_server.h"
@@ -24,8 +25,9 @@ namespace rpc {
/// This implementation class of `TaskInfoHandler`.
class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
public:
explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client)
: gcs_client_(gcs_client) {}
explicit DefaultTaskInfoHandler(gcs::RedisGcsClient &gcs_client,
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub)
: gcs_client_(gcs_client), gcs_pub_sub_(gcs_pub_sub) {}
void HandleAddTask(const AddTaskRequest &request, AddTaskReply *reply,
SendReplyCallback send_reply_callback) override;
@@ -45,6 +47,7 @@ class DefaultTaskInfoHandler : public rpc::TaskInfoHandler {
private:
gcs::RedisGcsClient &gcs_client_;
std::shared_ptr<gcs::GcsPubSub> &gcs_pub_sub_;
};
} // namespace rpc
+1
View File
@@ -28,6 +28,7 @@ namespace gcs {
#define JOB_CHANNEL "JOB"
#define WORKER_FAILURE_CHANNEL "WORKER_FAILURE"
#define OBJECT_CHANNEL "OBJECT"
#define TASK_CHANNEL "TASK"
/// \class GcsPubSub
///
+4 -6
View File
@@ -389,9 +389,8 @@ Status RedisTaskInfoAccessor::AsyncSubscribe(
return task_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done);
}
Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id,
const StatusCallback &done) {
return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done);
Status RedisTaskInfoAccessor::AsyncUnsubscribe(const TaskID &task_id) {
return task_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr);
}
Status RedisTaskInfoAccessor::AsyncAddTaskLease(
@@ -414,9 +413,8 @@ Status RedisTaskInfoAccessor::AsyncSubscribeTaskLease(
return task_lease_sub_executor_.AsyncSubscribe(subscribe_id_, task_id, subscribe, done);
}
Status RedisTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id,
const StatusCallback &done) {
return task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, done);
Status RedisTaskInfoAccessor::AsyncUnsubscribeTaskLease(const TaskID &task_id) {
return task_lease_sub_executor_.AsyncUnsubscribe(subscribe_id_, task_id, nullptr);
}
Status RedisTaskInfoAccessor::AttemptTaskReconstruction(
+2 -3
View File
@@ -195,7 +195,7 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
const SubscribeCallback<TaskID, TaskTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const TaskID &task_id, const StatusCallback &done) override;
Status AsyncUnsubscribe(const TaskID &task_id) override;
Status AsyncAddTaskLease(const std::shared_ptr<TaskLeaseData> &data_ptr,
const StatusCallback &callback) override;
@@ -205,8 +205,7 @@ class RedisTaskInfoAccessor : public TaskInfoAccessor {
const SubscribeCallback<TaskID, boost::optional<TaskLeaseData>> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribeTaskLease(const TaskID &task_id,
const StatusCallback &done) override;
Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override;
Status AttemptTaskReconstruction(
const std::shared_ptr<TaskReconstructionData> &data_ptr,
+1 -1
View File
@@ -321,7 +321,7 @@ bool LineageCache::UnsubscribeTask(const TaskID &task_id) {
bool subscribed = (it != subscribed_tasks_.end());
if (subscribed) {
// Cancel subscribe to the task.
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id, /*done*/ nullptr));
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribe(task_id));
subscribed_tasks_.erase(it);
}
// Return whether we were previously subscribed to this task and are now
+1 -1
View File
@@ -98,7 +98,7 @@ class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor {
return ray::Status::OK();
}
Status AsyncUnsubscribe(const TaskID &task_id, const gcs::StatusCallback &done) {
Status AsyncUnsubscribe(const TaskID &task_id) {
subscribed_tasks_.erase(task_id);
return ray::Status::OK();
}
+1 -2
View File
@@ -237,8 +237,7 @@ void ReconstructionPolicy::Cancel(const ObjectID &object_id) {
if (it->second.created_objects.empty()) {
// Cancel notifications for the task lease if we were subscribed to them.
if (it->second.subscribed) {
RAY_CHECK_OK(
gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id, /*done*/ nullptr));
RAY_CHECK_OK(gcs_client_->Tasks().AsyncUnsubscribeTaskLease(task_id));
}
listening_tasks_.erase(it);
}
+1 -2
View File
@@ -128,8 +128,7 @@ class MockTaskInfoAccessor : public gcs::RedisTaskInfoAccessor {
return ray::Status::OK();
}
Status AsyncUnsubscribeTaskLease(const TaskID &task_id,
const gcs::StatusCallback &done) override {
Status AsyncUnsubscribeTaskLease(const TaskID &task_id) override {
subscribed_tasks_.erase(task_id);
return ray::Status::OK();
}