[GCS]Add abstract interface of actor to GCS Client (#6269)

This commit is contained in:
micafan
2019-12-05 13:38:29 +08:00
committed by Hao Chen
parent 7611e484ec
commit 668ce47360
14 changed files with 204 additions and 149 deletions
+2 -2
View File
@@ -718,9 +718,9 @@ cc_binary(
)
cc_binary(
name = "actor_state_accessor_test",
name = "redis_actor_info_accessor_test",
testonly = 1,
srcs = ["src/ray/gcs/actor_state_accessor_test.cc"],
srcs = ["src/ray/gcs/redis_actor_info_accessor_test.cc"],
copts = COPTS,
deps = [
":gcs",
+1 -1
View File
@@ -223,7 +223,7 @@ cdef extern from "ray/core_worker/common.h" nogil:
const c_vector[c_string] &dynamic_worker_options,
c_bool is_detached, c_bool is_asyncio)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef extern from "ray/gcs/gcs_client.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
CGcsClientOptions(const c_string &ip, int port,
const c_string &password,
+5
View File
@@ -1,3 +1,6 @@
#ifndef RAY_GCS_ACCESSOR_TEST_BASE_H
#define RAY_GCS_ACCESSOR_TEST_BASE_H
#include <atomic>
#include <chrono>
#include <string>
@@ -74,3 +77,5 @@ class AccessorTestBase : public ::testing::Test {
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_ACCESSOR_TEST_BASE_H
+87
View File
@@ -0,0 +1,87 @@
#ifndef RAY_GCS_ACTOR_INFO_ACCESSOR_H
#define RAY_GCS_ACTOR_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
namespace ray {
namespace gcs {
/// \class ActorInfoAccessor
/// `ActorInfoAccessor` is a sub-interface of `GcsClient`.
/// This class includes all the methods that are related to accessing
/// actor information in the GCS.
class ActorInfoAccessor {
public:
virtual ~ActorInfoAccessor() = default;
/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
virtual Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) = 0;
/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
virtual Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Update dynamic states of actor in GCS asynchronously.
///
/// \param actor_id ID of the actor to update.
/// \param data_ptr Data of the actor to update.
/// \param callback Callback that will be called after update finishes.
/// \return Status
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
virtual Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) = 0;
/// Subscribe to any register or update operations of actors.
///
/// \param subscribe Callback that will be called each time when an actor is registered
/// or updated.
/// \param done Callback that will be called when subscription is complete and we
/// are ready to receive notification.
/// \return Status
virtual Status AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Subscribe to any update operations of an actor.
///
/// \param actor_id The ID of actor to be subscribed to.
/// \param subscribe Callback that will be called each time when the actor is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
virtual Status AsyncSubscribe(
const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) = 0;
/// Cancel subscription to an actor.
///
/// \param actor_id The ID of the actor to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
virtual Status AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) = 0;
protected:
ActorInfoAccessor() = default;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_ACTOR_INFO_ACCESSOR_H
-100
View File
@@ -1,100 +0,0 @@
#ifndef RAY_GCS_ACTOR_STATE_ACCESSOR_H
#define RAY_GCS_ACTOR_STATE_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
/// \class ActorStateAccessor
/// ActorStateAccessor class encapsulates the implementation details of
/// reading or writing or subscribing of actor's specification (immutable fields which
/// determined at submission time, and mutable fields which are determined at runtime).
class ActorStateAccessor {
public:
explicit ActorStateAccessor(RedisGcsClient &client_impl);
~ActorStateAccessor() {}
/// Get actor specification from GCS asynchronously.
///
/// \param actor_id The ID of actor to look up in the GCS.
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback);
/// Register an actor to GCS asynchronously.
///
/// \param data_ptr The actor that will be registered to the GCS.
/// \param callback Callback that will be called after actor has been registered
/// to the GCS.
/// \return Status
Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback);
/// Update dynamic states of actor in GCS asynchronously.
///
/// \param actor_id ID of the actor to update.
/// \param data_ptr Data of the actor to update.
/// \param callback Callback that will be called after update finishes.
/// \return Status
/// TODO(micafan) Don't expose the whole `ActorTableData` and only allow
/// updating dynamic states.
Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback);
/// Subscribe to any register or update operations of actors.
///
/// \param subscribe Callback that will be called each time when an actor is registered
/// or updated.
/// \param done Callback that will be called when subscription is complete and we
/// are ready to receive notification.
/// \return Status
Status AsyncSubscribe(const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done);
/// Subscribe to any update operations of an actor.
///
/// \param actor_id The ID of actor to be subscribed to.
/// \param subscribe Callback that will be called each time when the actor is updated.
/// \param done Callback that will be called when subscription is complete.
/// \return Status
Status AsyncSubscribe(const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done);
/// Cancel subscription to an actor.
///
/// \param actor_id The ID of the actor to be unsubscribed to.
/// \param done Callback that will be called when unsubscribe is complete.
/// \return Status
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done);
private:
RedisGcsClient &client_impl_;
// Use a random ClientID for actor subscription. Because:
// If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client.
// Even we can filter out irrelevant updates, but there will be extra overhead.
// And because the new GCS Client will no longer hold the local ClientID, so we use
// random ClientID instead.
// TODO(micafan): Remove this random id, once GCS becomes a service.
ClientID node_id_{ClientID::FromRandom()};
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_ACTOR_STATE_ACCESSOR_H
@@ -6,7 +6,7 @@
#include <string>
#include <vector>
#include "ray/common/status.h"
#include "ray/gcs/actor_state_accessor.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/util/logging.h"
namespace ray {
@@ -62,14 +62,14 @@ class GcsClientOptions {
bool is_test_client_{false};
};
/// \class GcsClientInterface
/// \class GcsClient
/// Abstract interface of the GCS client.
///
/// To read and write from the GCS, `Connect()` must be called and return Status::OK.
/// Before exit, `Disconnect()` must be called.
class GcsClientInterface : public std::enable_shared_from_this<GcsClientInterface> {
class GcsClient : public std::enable_shared_from_this<GcsClient> {
public:
virtual ~GcsClientInterface() {}
virtual ~GcsClient() {}
/// Connect to GCS Service. Non-thread safe.
/// This function must be called before calling other functions.
@@ -80,25 +80,25 @@ class GcsClientInterface : public std::enable_shared_from_this<GcsClientInterfac
/// Disconnect with GCS Service. Non-thread safe.
virtual void Disconnect() = 0;
/// Get ActorStateAccessor for reading or writing or subscribing to
/// Get ActorInfoAccessor for reading or writing or subscribing to
/// actors. This function is thread safe.
ActorStateAccessor &Actors() {
ActorInfoAccessor &Actors() {
RAY_CHECK(actor_accessor_ != nullptr);
return *actor_accessor_;
}
protected:
/// Constructor of GcsClientInterface.
/// Constructor of GcsClient.
///
/// \param options Options for client.
GcsClientInterface(const GcsClientOptions &options) : options_(options) {}
GcsClient(const GcsClientOptions &options) : options_(options) {}
GcsClientOptions options_;
/// Whether this client is connected to GCS.
bool is_connected_{false};
std::unique_ptr<ActorStateAccessor> actor_accessor_;
std::unique_ptr<ActorInfoAccessor> actor_accessor_;
};
} // namespace gcs
@@ -1,4 +1,4 @@
#include "ray/gcs/actor_state_accessor.h"
#include "ray/gcs/redis_actor_info_accessor.h"
#include <boost/none.hpp>
#include "ray/gcs/redis_gcs_client.h"
#include "ray/util/logging.h"
@@ -7,10 +7,10 @@ namespace ray {
namespace gcs {
ActorStateAccessor::ActorStateAccessor(RedisGcsClient &client_impl)
: client_impl_(client_impl), actor_sub_executor_(client_impl_.actor_table()) {}
RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), actor_sub_executor_(client_impl_->actor_table()) {}
Status ActorStateAccessor::AsyncGet(
Status RedisActorInfoAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
@@ -22,12 +22,11 @@ Status ActorStateAccessor::AsyncGet(
callback(Status::OK(), result);
};
ActorTable &actor_table = client_impl_.actor_table();
return actor_table.Lookup(JobID::Nil(), actor_id, on_done);
return client_impl_->actor_table().Lookup(JobID::Nil(), actor_id, on_done);
}
Status ActorStateAccessor::AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
Status RedisActorInfoAccessor::AsyncRegister(
const std::shared_ptr<ActorTableData> &data_ptr, const StatusCallback &callback) {
auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
if (callback != nullptr) {
@@ -43,14 +42,14 @@ Status ActorStateAccessor::AsyncRegister(const std::shared_ptr<ActorTableData> &
};
ActorID actor_id = ActorID::FromBinary(data_ptr->actor_id());
ActorTable &actor_table = client_impl_.actor_table();
return actor_table.AppendAt(JobID::Nil(), actor_id, data_ptr, on_success, on_failure,
/*log_length*/ 0);
return client_impl_->actor_table().AppendAt(JobID::Nil(), actor_id, data_ptr,
on_success, on_failure,
/*log_length*/ 0);
}
Status ActorStateAccessor::AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
Status RedisActorInfoAccessor::AsyncUpdate(
const ActorID &actor_id, const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) {
// The actor log starts with an ALIVE entry. This is followed by 0 to N pairs
// of (RECONSTRUCTING, ALIVE) entries, where N is the maximum number of
// reconstructions. This is followed optionally by a DEAD entry.
@@ -85,27 +84,26 @@ Status ActorStateAccessor::AsyncUpdate(const ActorID &actor_id,
}
};
ActorTable &actor_table = client_impl_.actor_table();
return actor_table.AppendAt(JobID::Nil(), actor_id, data_ptr, on_success, on_failure,
log_length);
return client_impl_->actor_table().AppendAt(JobID::Nil(), actor_id, data_ptr,
on_success, on_failure, log_length);
}
Status ActorStateAccessor::AsyncSubscribe(
Status RedisActorInfoAccessor::AsyncSubscribeAll(
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribe(ClientID::Nil(), subscribe, done);
}
Status ActorStateAccessor::AsyncSubscribe(
Status RedisActorInfoAccessor::AsyncSubscribe(
const ActorID &actor_id, const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) {
RAY_CHECK(subscribe != nullptr);
return actor_sub_executor_.AsyncSubscribe(node_id_, actor_id, subscribe, done);
}
Status ActorStateAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
Status RedisActorInfoAccessor::AsyncUnsubscribe(const ActorID &actor_id,
const StatusCallback &done) {
return actor_sub_executor_.AsyncUnsubscribe(node_id_, actor_id, done);
}
+63
View File
@@ -0,0 +1,63 @@
#ifndef RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
#define RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/gcs/tables.h"
namespace ray {
namespace gcs {
class RedisGcsClient;
/// \class RedisActorInfoAccessor
/// `RedisActorInfoAccessor` is an implementation of `ActorInfoAccessor`
/// that uses Redis as the backend storage.
class RedisActorInfoAccessor : public ActorInfoAccessor {
public:
explicit RedisActorInfoAccessor(RedisGcsClient *client_impl);
virtual ~RedisActorInfoAccessor() {}
Status AsyncGet(const ActorID &actor_id,
const OptionalItemCallback<ActorTableData> &callback) override;
Status AsyncRegister(const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncUpdate(const ActorID &actor_id,
const std::shared_ptr<ActorTableData> &data_ptr,
const StatusCallback &callback) override;
Status AsyncSubscribeAll(const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncSubscribe(const ActorID &actor_id,
const SubscribeCallback<ActorID, ActorTableData> &subscribe,
const StatusCallback &done) override;
Status AsyncUnsubscribe(const ActorID &actor_id, const StatusCallback &done) override;
private:
RedisGcsClient *client_impl_{nullptr};
// Use a random ClientID for actor subscription. Because:
// If we use ClientID::Nil, GCS will still send all actors' updates to this GCS Client.
// Even we can filter out irrelevant updates, but there will be extra overhead.
// And because the new GCS Client will no longer hold the local ClientID, so we use
// random ClientID instead.
// TODO(micafan): Remove this random id, once GCS becomes a service.
ClientID node_id_{ClientID::FromRandom()};
typedef SubscriptionExecutor<ActorID, ActorTableData, ActorTable>
ActorSubscriptionExecutor;
ActorSubscriptionExecutor actor_sub_executor_;
};
} // namespace gcs
} // namespace ray
#endif // RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
@@ -12,7 +12,7 @@ namespace ray {
namespace gcs {
class ActorStateAccessorTest : public AccessorTestBase<ActorID, ActorTableData> {
class ActorInfoAccessorTest : public AccessorTestBase<ActorID, ActorTableData> {
protected:
virtual void GenTestData() {
for (size_t i = 0; i < 100; ++i) {
@@ -29,8 +29,8 @@ class ActorStateAccessorTest : public AccessorTestBase<ActorID, ActorTableData>
}
};
TEST_F(ActorStateAccessorTest, RegisterAndGet) {
ActorStateAccessor &actor_accessor = gcs_client_->Actors();
TEST_F(ActorInfoAccessorTest, RegisterAndGet) {
ActorInfoAccessor &actor_accessor = gcs_client_->Actors();
// register
for (const auto &elem : id_to_data_) {
const auto &actor = elem.second;
@@ -59,8 +59,8 @@ TEST_F(ActorStateAccessorTest, RegisterAndGet) {
WaitPendingDone(wait_pending_timeout_);
}
TEST_F(ActorStateAccessorTest, Subscribe) {
ActorStateAccessor &actor_accessor = gcs_client_->Actors();
TEST_F(ActorInfoAccessorTest, Subscribe) {
ActorInfoAccessor &actor_accessor = gcs_client_->Actors();
// subscribe
std::atomic<int> sub_pending_count(0);
std::atomic<int> do_sub_pending_count(0);
@@ -76,7 +76,7 @@ TEST_F(ActorStateAccessorTest, Subscribe) {
};
++do_sub_pending_count;
RAY_CHECK_OK(actor_accessor.AsyncSubscribe(subscribe, done));
RAY_CHECK_OK(actor_accessor.AsyncSubscribeAll(subscribe, done));
// Wait until subscribe finishes.
WaitPendingDone(do_sub_pending_count, wait_pending_timeout_);
+3 -3
View File
@@ -1,6 +1,7 @@
#include "ray/gcs/redis_gcs_client.h"
#include "ray/common/ray_config.h"
#include "ray/gcs/redis_actor_info_accessor.h"
#include "ray/gcs/redis_context.h"
#include <unistd.h>
@@ -72,8 +73,7 @@ namespace ray {
namespace gcs {
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options)
: GcsClientInterface(options) {}
RedisGcsClient::RedisGcsClient(const GcsClientOptions &options) : GcsClient(options) {}
Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
RAY_CHECK(!is_connected_);
@@ -137,7 +137,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
actor_checkpoint_id_table_.reset(new ActorCheckpointIdTable(shard_contexts_, this));
resource_table_.reset(new DynamicResourceTable({primary_context_}, this));
actor_accessor_.reset(new ActorStateAccessor(*this));
actor_accessor_.reset(new RedisActorInfoAccessor(this));
Status status = Attach(io_service);
is_connected_ = status.ok();
+5 -3
View File
@@ -7,7 +7,7 @@
#include "ray/common/id.h"
#include "ray/common/status.h"
#include "ray/gcs/asio.h"
#include "ray/gcs/gcs_client_interface.h"
#include "ray/gcs/gcs_client.h"
#include "ray/gcs/tables.h"
#include "ray/util/logging.h"
@@ -17,8 +17,10 @@ namespace gcs {
class RedisContext;
class RAY_EXPORT RedisGcsClient : public GcsClientInterface {
friend class ActorStateAccessor;
class RAY_EXPORT RedisGcsClient : public GcsClient {
// TODO(micafan) Will remove those friend class after we replace RedisGcsClient
// with interface class GcsClient in raylet.
friend class RedisActorInfoAccessor;
friend class SubscriptionExecutorTest;
public:
+1 -1
View File
@@ -56,7 +56,7 @@ class SubscriptionExecutorTest : public AccessorTestBase<ActorID, ActorTableData
}
size_t AsyncRegisterActorToGcs() {
ActorStateAccessor &actor_accessor = gcs_client_->Actors();
ActorInfoAccessor &actor_accessor = gcs_client_->Actors();
for (const auto &elem : id_to_data_) {
const auto &actor = elem.second;
auto done = [this](Status status) {
+1 -1
View File
@@ -183,7 +183,7 @@ ray::Status NodeManager::RegisterGcs() {
};
RAY_RETURN_NOT_OK(
gcs_client_->Actors().AsyncSubscribe(actor_notification_callback, nullptr));
gcs_client_->Actors().AsyncSubscribeAll(actor_notification_callback, nullptr));
// Register a callback on the client table for new clients.
auto node_manager_client_added = [this](gcs::RedisGcsClient *client, const UniqueID &id,
+2 -2
View File
@@ -6,7 +6,7 @@
set -e
set -x
bazel build "//:redis_gcs_client_test" "//:actor_state_accessor_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so"
bazel build "//:redis_gcs_client_test" "//:redis_actor_info_accessor_test" "//:subscription_executor_test" "//:asio_test" "//:libray_redis_module.so"
# Start Redis.
./bazel-bin/redis-server \
@@ -16,7 +16,7 @@ bazel build "//:redis_gcs_client_test" "//:actor_state_accessor_test" "//:subsc
sleep 1s
./bazel-bin/redis_gcs_client_test
./bazel-bin/actor_state_accessor_test
./bazel-bin/redis_actor_info_accessor_test
./bazel-bin/subscription_executor_test
./bazel-bin/asio_test