diff --git a/src/ray/gcs/actor_state_accessor.cc b/src/ray/gcs/actor_state_accessor.cc index f76af65cb..4cb356a32 100644 --- a/src/ray/gcs/actor_state_accessor.cc +++ b/src/ray/gcs/actor_state_accessor.cc @@ -10,12 +10,16 @@ namespace gcs { ActorStateAccessor::ActorStateAccessor(RedisGcsClient &client_impl) : client_impl_(client_impl), actor_sub_executor_(client_impl_.actor_table()) {} -Status ActorStateAccessor::AsyncGet(const ActorID &actor_id, - const MultiItemCallback &callback) { +Status ActorStateAccessor::AsyncGet( + const ActorID &actor_id, const OptionalItemCallback &callback) { RAY_CHECK(callback != nullptr); auto on_done = [callback](RedisGcsClient *client, const ActorID &actor_id, const std::vector &data) { - callback(Status::OK(), data); + boost::optional result; + if (!data.empty()) { + result = data.back(); + } + callback(Status::OK(), result); }; ActorTable &actor_table = client_impl_.actor_table(); diff --git a/src/ray/gcs/actor_state_accessor.h b/src/ray/gcs/actor_state_accessor.h index c6a812f4d..31e40e277 100644 --- a/src/ray/gcs/actor_state_accessor.h +++ b/src/ray/gcs/actor_state_accessor.h @@ -28,7 +28,7 @@ class ActorStateAccessor { /// \param callback Callback that will be called after lookup finishes. /// \return Status Status AsyncGet(const ActorID &actor_id, - const MultiItemCallback &callback); + const OptionalItemCallback &callback); /// Register an actor to GCS asynchronously. /// diff --git a/src/ray/gcs/actor_state_accessor_test.cc b/src/ray/gcs/actor_state_accessor_test.cc index 5a5ab1475..f7fc9bb90 100644 --- a/src/ray/gcs/actor_state_accessor_test.cc +++ b/src/ray/gcs/actor_state_accessor_test.cc @@ -47,9 +47,9 @@ TEST_F(ActorStateAccessorTest, RegisterAndGet) { for (const auto &elem : id_to_data_) { ++pending_count_; RAY_CHECK_OK(actor_accessor.AsyncGet( - elem.first, [this](Status status, std::vector datas) { - ASSERT_EQ(datas.size(), 1U); - ActorID actor_id = ActorID::FromBinary(datas[0].actor_id()); + elem.first, [this](Status status, const boost::optional &data) { + ASSERT_TRUE(data); + ActorID actor_id = ActorID::FromBinary(data->actor_id()); auto it = id_to_data_.find(actor_id); ASSERT_TRUE(it != id_to_data_.end()); --pending_count_; diff --git a/src/ray/gcs/callback.h b/src/ray/gcs/callback.h index ef21a0beb..bf074ba15 100644 --- a/src/ray/gcs/callback.h +++ b/src/ray/gcs/callback.h @@ -19,7 +19,7 @@ using StatusCallback = std::function; /// this optional object is empty. template using OptionalItemCallback = - std::function result)>; + std::function &result)>; /// This callback is used to receive multiple items from GCS when a read completes. /// \param status Status indicates whether the read was successful. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index ee7c0ece6..53b84d77d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1537,14 +1537,14 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // was already created. Look up the actor's registered location in case // we missed the creation notification. const ActorID &actor_id = spec.ActorId(); - auto lookup_callback = [this, actor_id](Status status, - const std::vector &data) { - if (!data.empty()) { - // The actor has been created. We only need the last entry, because - // it represents the latest state of this actor. - HandleActorStateTransition(actor_id, ActorRegistration(data.back())); - } - }; + auto lookup_callback = + [this, actor_id](Status status, const boost::optional &data) { + if (data) { + // The actor has been created. We only need the last entry, because + // it represents the latest state of this actor. + HandleActorStateTransition(actor_id, ActorRegistration(*data)); + } + }; RAY_CHECK_OK(gcs_client_->Actors().AsyncGet(actor_id, lookup_callback)); actor_creation_dummy_object = spec.ActorCreationDummyObjectId(); } else {