modify ActorStateAccessor::AsyncGet callback (#5417)

This commit is contained in:
micafan
2019-08-21 10:54:33 +08:00
committed by Hao Chen
parent e065f55998
commit 52a7c1d673
5 changed files with 20 additions and 16 deletions
+7 -3
View File
@@ -10,12 +10,16 @@ namespace gcs {
ActorStateAccessor::ActorStateAccessor(RedisGcsClient &client_impl)
: client_impl_(client_impl), actor_sub_executor_(client_impl_.actor_table()) {}
Status ActorStateAccessor::AsyncGet(const ActorID &actor_id,
const MultiItemCallback<ActorTableData> &callback) {
Status ActorStateAccessor::AsyncGet(
const ActorID &actor_id, const OptionalItemCallback<ActorTableData> &callback) {
RAY_CHECK(callback != nullptr);
auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id,
const std::vector<ActorTableData> &data) {
callback(Status::OK(), data);
boost::optional<ActorTableData> result;
if (!data.empty()) {
result = data.back();
}
callback(Status::OK(), result);
};
ActorTable &actor_table = client_impl_.actor_table();
+1 -1
View File
@@ -28,7 +28,7 @@ class ActorStateAccessor {
/// \param callback Callback that will be called after lookup finishes.
/// \return Status
Status AsyncGet(const ActorID &actor_id,
const MultiItemCallback<ActorTableData> &callback);
const OptionalItemCallback<ActorTableData> &callback);
/// Register an actor to GCS asynchronously.
///
+3 -3
View File
@@ -47,9 +47,9 @@ TEST_F(ActorStateAccessorTest, RegisterAndGet) {
for (const auto &elem : id_to_data_) {
++pending_count_;
RAY_CHECK_OK(actor_accessor.AsyncGet(
elem.first, [this](Status status, std::vector<ActorTableData> datas) {
ASSERT_EQ(datas.size(), 1U);
ActorID actor_id = ActorID::FromBinary(datas[0].actor_id());
elem.first, [this](Status status, const boost::optional<ActorTableData> &data) {
ASSERT_TRUE(data);
ActorID actor_id = ActorID::FromBinary(data->actor_id());
auto it = id_to_data_.find(actor_id);
ASSERT_TRUE(it != id_to_data_.end());
--pending_count_;
+1 -1
View File
@@ -19,7 +19,7 @@ using StatusCallback = std::function<void(Status status)>;
/// this optional object is empty.
template <typename Data>
using OptionalItemCallback =
std::function<void(Status status, boost::optional<Data> result)>;
std::function<void(Status status, const boost::optional<Data> &result)>;
/// This callback is used to receive multiple items from GCS when a read completes.
/// \param status Status indicates whether the read was successful.
+8 -8
View File
@@ -1537,14 +1537,14 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag
// was already created. Look up the actor's registered location in case
// we missed the creation notification.
const ActorID &actor_id = spec.ActorId();
auto lookup_callback = [this, actor_id](Status status,
const std::vector<ActorTableData> &data) {
if (!data.empty()) {
// The actor has been created. We only need the last entry, because
// it represents the latest state of this actor.
HandleActorStateTransition(actor_id, ActorRegistration(data.back()));
}
};
auto lookup_callback =
[this, actor_id](Status status, const boost::optional<ActorTableData> &data) {
if (data) {
// The actor has been created. We only need the last entry, because
// it represents the latest state of this actor.
HandleActorStateTransition(actor_id, ActorRegistration(*data));
}
};
RAY_CHECK_OK(gcs_client_->Actors().AsyncGet(actor_id, lookup_callback));
actor_creation_dummy_object = spec.ActorCreationDummyObjectId();
} else {