Port actor creation to use direct calls (#6375)

This commit is contained in:
Stephanie Wang
2019-12-12 19:50:51 -08:00
committed by Eric Liang
parent 74b454c614
commit c57dcc82d1
27 changed files with 447 additions and 175 deletions
+3 -1
View File
@@ -718,7 +718,9 @@ cdef CRayStatus task_execution_handler(
traceback_str,
job_id=None)
sys.exit(1)
except SystemExit:
except SystemExit as e:
if not hasattr(e, "is_ray_terminate"):
logger.exception("SystemExit was raised from the worker")
# Tell the core worker to exit as soon as the result objects
# are processed.
return CRayStatus.SystemExit()
+5 -2
View File
@@ -6,7 +6,6 @@ import copy
import inspect
import logging
import six
import sys
import weakref
from abc import ABCMeta, abstractmethod
@@ -815,7 +814,11 @@ def exit_actor():
ray.disconnect()
# Disconnect global state from GCS.
ray.state.state.disconnect()
sys.exit(0)
# Set a flag to indicate this is an intentional actor exit. This
# reduces log verbosity.
exit = SystemExit(0)
exit.is_ray_terminate = True
raise exit
assert False, "This process should have terminated."
else:
raise Exception("exit_actor called on a non-actor worker.")
+12 -1
View File
@@ -53,12 +53,23 @@ void TaskSpecification::ComputeResources() {
// Task specification getter methods.
TaskID TaskSpecification::TaskId() const {
if (message_->task_id().empty() /* e.g., empty proto default */) {
return TaskID::Nil();
}
return TaskID::FromBinary(message_->task_id());
}
JobID TaskSpecification::JobId() const { return JobID::FromBinary(message_->job_id()); }
JobID TaskSpecification::JobId() const {
if (message_->job_id().empty() /* e.g., empty proto default */) {
return JobID::Nil();
}
return JobID::FromBinary(message_->job_id());
}
TaskID TaskSpecification::ParentTaskId() const {
if (message_->parent_task_id().empty() /* e.g., empty proto default */) {
return TaskID::Nil();
}
return TaskID::FromBinary(message_->parent_task_id());
}
+11
View File
@@ -8,6 +8,7 @@
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/protobuf/core_worker.pb.h"
#include "ray/protobuf/gcs.pb.h"
namespace ray {
@@ -52,10 +53,20 @@ class ActorHandle {
/// ActorHandle and reset them in this method.
void Reset();
// Mark the actor handle as dead.
void MarkDead() { state_ = rpc::ActorTableData::DEAD; }
// Returns whether the actor is known to be dead.
bool IsDead() const { return state_ == rpc::ActorTableData::DEAD; }
private:
// Protobuf-defined persistent state of the actor handle.
const ray::rpc::ActorHandle inner_;
/// The actor's state (alive or dead). This defaults to ALIVE. Once marked
/// DEAD, the actor handle can never go back to being ALIVE.
rpc::ActorTableData::ActorState state_ = rpc::ActorTableData::ALIVE;
/// The unique id of the dummy object returned by the previous task.
/// TODO: This can be removed once we schedule actor tasks by task counter
/// only.
+21
View File
@@ -0,0 +1,21 @@
#include "ray/core_worker/actor_manager.h"
#include "ray/gcs/redis_actor_info_accessor.h"
namespace ray {
void ActorManager::PublishCreatedActor(const TaskSpecification &actor_creation_task,
const rpc::Address &address) {
auto actor_id = actor_creation_task.ActorCreationId();
auto data = gcs::CreateActorTableData(actor_creation_task, address,
gcs::ActorTableData::ALIVE, 0);
RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr));
}
void ActorManager::PublishTerminatedActor(const TaskSpecification &actor_creation_task) {
auto actor_id = actor_creation_task.ActorCreationId();
auto data = gcs::CreateActorTableData(actor_creation_task, rpc::Address(),
gcs::ActorTableData::DEAD, 0);
RAY_CHECK_OK(global_actor_table_.Append(JobID::Nil(), actor_id, data, nullptr));
}
} // namespace ray
+40
View File
@@ -0,0 +1,40 @@
#ifndef RAY_CORE_WORKER_ACTOR_MANAGER_H
#define RAY_CORE_WORKER_ACTOR_MANAGER_H
#include "ray/core_worker/actor_handle.h"
#include "ray/gcs/redis_gcs_client.h"
namespace ray {
// Interface for testing.
class ActorManagerInterface {
public:
virtual void PublishCreatedActor(const TaskSpecification &actor_creation_task,
const rpc::Address &address) = 0;
virtual void PublishTerminatedActor(const TaskSpecification &actor_creation_task) = 0;
virtual ~ActorManagerInterface() {}
};
/// Class to manage lifetimes of actors that we create (actor children).
class ActorManager : public ActorManagerInterface {
public:
ActorManager(gcs::DirectActorTable &global_actor_table)
: global_actor_table_(global_actor_table) {}
/// Called when an actor creation task that we submitted finishes.
void PublishCreatedActor(const TaskSpecification &actor_creation_task,
const rpc::Address &address) override;
/// Called when an actor that we own can no longer be restarted.
void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override;
private:
/// Global database of actors.
gcs::DirectActorTable &global_actor_table_;
};
} // namespace ray
#endif // RAY_CORE_WORKER_ACTOR_MANAGER_H
+45 -33
View File
@@ -102,6 +102,13 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
// Initialize gcs client.
gcs_client_ = std::make_shared<gcs::RedisGcsClient>(gcs_options);
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
direct_actor_table_subscriber_ = std::unique_ptr<
gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>>(
new gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>(
gcs_client_->direct_actor_table()));
actor_manager_ =
std::unique_ptr<ActorManager>(new ActorManager(gcs_client_->direct_actor_table()));
// Initialize profiler.
profiler_ = std::make_shared<worker::Profiler>(worker_context_, node_ip_address,
@@ -138,10 +145,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
WorkerID::FromBinary(worker_context_.GetWorkerID().Binary()),
(worker_type_ == ray::WorkerType::WORKER), worker_context_.GetCurrentJobID(),
language_, &local_raylet_id, core_worker_server_.GetPort()));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
direct_task_receiver_->Init(*local_raylet_client_);
}
// Set our own address.
RAY_CHECK(!local_raylet_id.IsNil());
@@ -177,8 +180,8 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
},
ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_));
task_manager_.reset(
new TaskManager(memory_store_, [this](const TaskSpecification &spec) {
task_manager_.reset(new TaskManager(
memory_store_, actor_manager_, [this](const TaskSpecification &spec) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: "
@@ -229,6 +232,10 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
memory_store_, task_manager_, local_raylet_id,
RayConfig::instance().worker_lease_timeout_milliseconds()));
future_resolver_.reset(new FutureResolver(memory_store_, client_factory));
// Unfortunately the raylet client has to be constructed after the receivers.
if (direct_task_receiver_ != nullptr) {
direct_task_receiver_->Init(*local_raylet_client_, client_factory, rpc_address_);
}
}
CoreWorker::~CoreWorker() {
@@ -275,7 +282,8 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
if (actor_id_.IsNil() && task_id.IsNil()) {
absl::MutexLock lock(&actor_handles_mutex_);
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr));
RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncUnsubscribe(
gcs_client_->client_table().GetLocalClientId(), handle.first, nullptr));
}
actor_handles_.clear();
}
@@ -670,7 +678,9 @@ Status CoreWorker::CreateActor(const RayFunction &function,
worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(),
rpc_address_, function, args, 1, actor_creation_options.resources,
actor_creation_options.placement_resources,
TaskTransportType::RAYLET, &return_ids);
actor_creation_options.is_direct_call ? TaskTransportType::DIRECT
: TaskTransportType::RAYLET,
&return_ids);
builder.SetActorCreationTaskSpec(
actor_id, actor_creation_options.max_reconstructions,
actor_creation_options.dynamic_worker_options,
@@ -685,13 +695,14 @@ Status CoreWorker::CreateActor(const RayFunction &function,
*return_actor_id = actor_id;
TaskSpecification task_spec = builder.Build();
PinObjectReferences(task_spec, TaskTransportType::RAYLET);
// TODO(ekl) if we moved actor creation to use direct call tasks, then we won't
// need to manually resolve direct call args here.
resolver_->ResolveDependencies(task_spec, [this, task_spec]() {
RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec));
});
return Status::OK();
if (actor_creation_options.is_direct_call) {
task_manager_->AddPendingTask(task_spec, actor_creation_options.max_reconstructions);
PinObjectReferences(task_spec, TaskTransportType::DIRECT);
return direct_task_submitter_->SubmitTask(task_spec);
} else {
PinObjectReferences(task_spec, TaskTransportType::RAYLET);
return local_raylet_client_->SubmitTask(task_spec);
}
}
Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &function,
@@ -731,7 +742,13 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
if (is_direct_call) {
task_manager_->AddPendingTask(task_spec);
PinObjectReferences(task_spec, TaskTransportType::DIRECT);
status = direct_actor_submitter_->SubmitTask(task_spec);
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
} else {
status = direct_actor_submitter_->SubmitTask(task_spec);
}
} else {
PinObjectReferences(task_spec, TaskTransportType::RAYLET);
RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec));
@@ -765,34 +782,29 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
RAY_CHECK(actor_data.state() != gcs::ActorTableData::RECONSTRUCTING);
if (actor_data.state() == gcs::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, true);
ActorHandle *actor_handle = nullptr;
RAY_CHECK_OK(GetActorHandle(actor_id, &actor_handle));
actor_handle->MarkDead();
// We cannot erase the actor handle here because clients can still
// submit tasks to dead actors. This also means we defer unsubscription,
// otherwise we crash when bulk unsubscribing all actor handles.
} else {
direct_actor_submitter_->ConnectActor(actor_id, actor_data.address());
}
direct_actor_submitter_->HandleActorUpdate(actor_id, actor_data);
RAY_LOG(INFO) << "received notification on actor, state="
<< static_cast<int>(actor_data.state()) << ", actor_id: " << actor_id
<< ", ip address: " << actor_data.address().ip_address()
<< ", port: " << actor_data.address().port();
};
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr));
RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncSubscribe(
gcs_client_->client_table().GetLocalClientId(), actor_id,
actor_notification_callback, nullptr));
}
return inserted;
}
+10
View File
@@ -4,6 +4,7 @@
#include "absl/container/flat_hash_map.h"
#include "ray/common/buffer.h"
#include "ray/core_worker/actor_handle.h"
#include "ray/core_worker/actor_manager.h"
#include "ray/core_worker/common.h"
#include "ray/core_worker/context.h"
#include "ray/core_worker/future_resolver.h"
@@ -15,6 +16,7 @@
#include "ray/core_worker/transport/direct_task_transport.h"
#include "ray/core_worker/transport/raylet_transport.h"
#include "ray/gcs/redis_gcs_client.h"
#include "ray/gcs/subscription_executor.h"
#include "ray/raylet/raylet_client.h"
#include "ray/rpc/node_manager/node_manager_client.h"
#include "ray/rpc/worker/core_worker_client.h"
@@ -537,6 +539,11 @@ class CoreWorker {
// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
// Client to listen to direct actor events.
std::unique_ptr<
gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>>
direct_actor_table_subscriber_;
// Client to the raylet shared by core worker interfaces. This needs to be a
// shared_ptr for direct calls because we can lease multiple workers through
// one client, and we need to keep the connection alive until we return all
@@ -568,6 +575,9 @@ class CoreWorker {
// Tracks the currently pending tasks.
std::shared_ptr<TaskManager> task_manager_;
// Interface for publishing actor creation.
std::shared_ptr<ActorManager> actor_manager_;
// Interface to submit tasks directly to other actors.
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;
+52 -4
View File
@@ -1,7 +1,14 @@
#include "ray/core_worker/task_manager.h"
#include "ray/util/util.h"
namespace ray {
// Start throttling task failure logs once we hit this threshold.
const int64_t kTaskFailureThrottlingThreshold = 50;
// Throttle task failure logs to once this interval.
const int64_t kTaskFailureLoggingFrequencyMillis = 5000;
void TaskManager::AddPendingTask(const TaskSpecification &spec, int max_retries) {
RAY_LOG(DEBUG) << "Adding pending task " << spec.TaskId();
absl::MutexLock lock(&mu_);
@@ -15,13 +22,16 @@ bool TaskManager::IsTaskPending(const TaskID &task_id) const {
}
void TaskManager::CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply) {
const rpc::PushTaskReply &reply,
const rpc::Address *actor_addr) {
RAY_LOG(DEBUG) << "Completing task " << task_id;
TaskSpecification spec;
{
absl::MutexLock lock(&mu_);
auto it = pending_tasks_.find(task_id);
RAY_CHECK(it != pending_tasks_.end())
<< "Tried to complete task that was not pending " << task_id;
spec = it->second.first;
pending_tasks_.erase(it);
}
@@ -52,9 +62,15 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
in_memory_store_->Put(RayObject(data_buffer, metadata_buffer), object_id));
}
}
if (spec.IsActorCreationTask()) {
RAY_CHECK(actor_addr != nullptr);
actor_manager_->PublishCreatedActor(spec, *actor_addr);
}
}
void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) {
void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status) {
// Note that this might be the __ray_terminate__ task, so we don't log
// loudly with ERROR here.
RAY_LOG(DEBUG) << "Task " << task_id << " failed with error "
@@ -83,20 +99,52 @@ void TaskManager::PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_
<< ", attempting to resubmit.";
retry_task_callback_(spec);
} else {
MarkPendingTaskFailed(task_id, spec.NumReturns(), error_type);
// Throttled logging of task failure errors.
{
absl::MutexLock lock(&mu_);
auto debug_str = spec.DebugString();
if (debug_str.find("__ray_terminate__") == std::string::npos &&
(num_failure_logs_ < kTaskFailureThrottlingThreshold ||
(current_time_ms() - last_log_time_ms_) >
kTaskFailureLoggingFrequencyMillis)) {
if (num_failure_logs_++ == kTaskFailureThrottlingThreshold) {
RAY_LOG(ERROR) << "Too many failure logs, throttling to once every "
<< kTaskFailureLoggingFrequencyMillis << " millis.";
}
last_log_time_ms_ = current_time_ms();
if (status != nullptr) {
RAY_LOG(ERROR) << "Task failed: " << *status << ": " << spec.DebugString();
} else {
RAY_LOG(ERROR) << "Task failed: " << spec.DebugString();
}
}
}
MarkPendingTaskFailed(task_id, spec, error_type);
}
}
void TaskManager::MarkPendingTaskFailed(const TaskID &task_id, int64_t num_returns,
void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
const TaskSpecification &spec,
rpc::ErrorType error_type) {
RAY_LOG(DEBUG) << "Treat task as failed. task_id: " << task_id
<< ", error_type: " << ErrorType_Name(error_type);
int64_t num_returns = spec.NumReturns();
for (int i = 0; i < num_returns; i++) {
const auto object_id = ObjectID::ForTaskReturn(
task_id, /*index=*/i + 1,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT));
RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id));
}
if (spec.IsActorCreationTask()) {
actor_manager_->PublishTerminatedActor(spec);
}
}
TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const {
absl::MutexLock lock(&mu_);
auto it = pending_tasks_.find(task_id);
RAY_CHECK(it != pending_tasks_.end());
return it->second.first;
}
} // namespace ray
+28 -8
View File
@@ -7,6 +7,7 @@
#include "ray/common/id.h"
#include "ray/common/task/task.h"
#include "ray/core_worker/actor_manager.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/protobuf/core_worker.pb.h"
#include "ray/protobuf/gcs.pb.h"
@@ -15,10 +16,11 @@ namespace ray {
class TaskFinisherInterface {
public:
virtual void CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply) = 0;
virtual void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply,
const rpc::Address *actor_addr) = 0;
virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) = 0;
virtual void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) = 0;
virtual ~TaskFinisherInterface() {}
};
@@ -28,8 +30,11 @@ using RetryTaskCallback = std::function<void(const TaskSpecification &spec)>;
class TaskManager : public TaskFinisherInterface {
public:
TaskManager(std::shared_ptr<CoreWorkerMemoryStore> in_memory_store,
std::shared_ptr<ActorManagerInterface> actor_manager,
RetryTaskCallback retry_task_callback)
: in_memory_store_(in_memory_store), retry_task_callback_(retry_task_callback) {}
: in_memory_store_(in_memory_store),
actor_manager_(actor_manager),
retry_task_callback_(retry_task_callback) {}
/// Add a task that is pending execution.
///
@@ -49,29 +54,44 @@ class TaskManager : public TaskFinisherInterface {
///
/// \param[in] task_id ID of the pending task.
/// \param[in] reply Proto response to a direct actor or task call.
/// \param[in] actor_addr Address of the created actor, or nullptr.
/// \return Void.
void CompletePendingTask(const TaskID &task_id,
const rpc::PushTaskReply &reply) override;
void CompletePendingTask(const TaskID &task_id, const rpc::PushTaskReply &reply,
const rpc::Address *actor_addr) override;
/// A pending task failed. This will either retry the task or mark the task
/// as failed if there are no retries left.
///
/// \param[in] task_id ID of the pending task.
/// \param[in] error_type The type of the specific error.
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) override;
/// \param[in] status Optional status message.
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status = nullptr) override;
/// Return the spec for a pending task.
TaskSpecification GetTaskSpec(const TaskID &task_id) const;
private:
/// Treat a pending task as failed. The lock should not be held when calling
/// this method because it may trigger callbacks in this or other classes.
void MarkPendingTaskFailed(const TaskID &task_id, int64_t num_returns,
void MarkPendingTaskFailed(const TaskID &task_id, const TaskSpecification &spec,
rpc::ErrorType error_type) LOCKS_EXCLUDED(mu_);
/// Used to store task results.
std::shared_ptr<CoreWorkerMemoryStore> in_memory_store_;
// Interface for publishing actor creation.
std::shared_ptr<ActorManagerInterface> actor_manager_;
/// Called when a task should be retried.
const RetryTaskCallback retry_task_callback_;
// The number of task failures we have logged total.
int64_t num_failure_logs_ GUARDED_BY(mu_) = 0;
// The last time we logged a task failure.
int64_t last_log_time_ms_ GUARDED_BY(mu_) = 0;
/// Protects below fields.
mutable absl::Mutex mu_;
+11 -9
View File
@@ -993,16 +993,18 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) {
TestActorTask(resources, true);
}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) {
std::unordered_map<std::string, double> resources;
TestActorReconstruction(resources, true);
}
// TODO(ekl) support reconstruction for direct call actors
// TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) {
// std::unordered_map<std::string, double> resources;
// TestActorReconstruction(resources, true);
//}
TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorReconstruction(resources, true);
}
// TODO(ekl) support reconstruction for direct call actors
// TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) {
// std::unordered_map<std::string, double> resources;
// resources.emplace("resource1", 1);
// TestActorReconstruction(resources, true);
//}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) {
std::unordered_map<std::string, double> resources;
@@ -41,8 +41,10 @@ class MockTaskFinisher : public TaskFinisherInterface {
public:
MockTaskFinisher() {}
MOCK_METHOD2(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &));
MOCK_METHOD2(PendingTaskFailed, void(const TaskID &task_id, rpc::ErrorType error_type));
MOCK_METHOD3(CompletePendingTask, void(const TaskID &, const rpc::PushTaskReply &,
const rpc::Address *addr));
MOCK_METHOD3(PendingTaskFailed,
void(const TaskID &task_id, rpc::ErrorType error_type, Status *status));
};
TaskSpecification CreateActorTaskHelper(ActorID actor_id, int64_t counter) {
@@ -70,32 +72,32 @@ class DirectActorTransportTest : public ::testing::Test {
};
TEST_F(DirectActorTransportTest, TestSubmitTask) {
rpc::Address addr;
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
auto task = CreateActorTaskHelper(actor_id, 0);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 0);
gcs::ActorTableData actor_data;
submitter_.HandleActorUpdate(actor_id, actor_data);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 1);
task = CreateActorTaskHelper(actor_id, 1);
ASSERT_TRUE(submitter_.SubmitTask(task).ok());
ASSERT_EQ(worker_client_->callbacks.size(), 2);
EXPECT_CALL(*task_finisher_, CompletePendingTask(TaskID::Nil(), _))
EXPECT_CALL(*task_finisher_, CompletePendingTask(TaskID::Nil(), _, _))
.Times(worker_client_->callbacks.size());
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _)).Times(0);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(0);
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask());
}
}
TEST_F(DirectActorTransportTest, TestDependencies) {
rpc::Address addr;
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
gcs::ActorTableData actor_data;
submitter_.HandleActorUpdate(actor_id, actor_data);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor with different arguments.
@@ -121,9 +123,9 @@ TEST_F(DirectActorTransportTest, TestDependencies) {
}
TEST_F(DirectActorTransportTest, TestOutOfOrderDependencies) {
rpc::Address addr;
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
gcs::ActorTableData actor_data;
submitter_.HandleActorUpdate(actor_id, actor_data);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor with different arguments.
@@ -150,9 +152,10 @@ TEST_F(DirectActorTransportTest, TestOutOfOrderDependencies) {
}
TEST_F(DirectActorTransportTest, TestActorFailure) {
rpc::Address addr;
ActorID actor_id = ActorID::Of(JobID::FromInt(0), TaskID::Nil(), 0);
gcs::ActorTableData actor_data;
submitter_.HandleActorUpdate(actor_id, actor_data);
submitter_.ConnectActor(actor_id, addr);
ASSERT_EQ(worker_client_->callbacks.size(), 0);
// Create two tasks for the actor.
@@ -163,8 +166,8 @@ TEST_F(DirectActorTransportTest, TestActorFailure) {
ASSERT_EQ(worker_client_->callbacks.size(), 2);
// Simulate the actor dying. All submitted tasks should get failed.
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _)).Times(2);
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _)).Times(0);
EXPECT_CALL(*task_finisher_, PendingTaskFailed(_, _, _)).Times(2);
EXPECT_CALL(*task_finisher_, CompletePendingTask(_, _, _)).Times(0);
while (!worker_client_->callbacks.empty()) {
ASSERT_TRUE(worker_client_->ReplyPushTask(Status::IOError("")));
}
@@ -41,10 +41,13 @@ class MockTaskFinisher : public TaskFinisherInterface {
public:
MockTaskFinisher() {}
void CompletePendingTask(const TaskID &, const rpc::PushTaskReply &) override {
void CompletePendingTask(const TaskID &, const rpc::PushTaskReply &,
const rpc::Address *actor_addr) override {
num_tasks_complete++;
}
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type) override {
void PendingTaskFailed(const TaskID &task_id, rpc::ErrorType error_type,
Status *status) override {
num_tasks_failed++;
}
+19 -2
View File
@@ -1,6 +1,7 @@
#include "gtest/gtest.h"
#include "ray/common/task/task_spec.h"
#include "ray/core_worker/actor_manager.h"
#include "ray/core_worker/store_provider/memory_store/memory_store.h"
#include "ray/core_worker/task_manager.h"
#include "ray/util/test_util.h"
@@ -14,16 +15,32 @@ TaskSpecification CreateTaskHelper(uint64_t num_returns) {
return task;
}
class MockActorManager : public ActorManagerInterface {
void PublishCreatedActor(const TaskSpecification &actor_creation_task,
const rpc::Address &address) override {
num_publishes += 1;
}
void PublishTerminatedActor(const TaskSpecification &actor_creation_task) override {
num_terminations += 1;
}
int num_publishes = 0;
int num_terminations = 0;
};
class TaskManagerTest : public ::testing::Test {
public:
TaskManagerTest()
: store_(std::shared_ptr<CoreWorkerMemoryStore>(new CoreWorkerMemoryStore())),
manager_(store_, [this](const TaskSpecification &spec) {
actor_manager_(std::shared_ptr<ActorManagerInterface>(new MockActorManager())),
manager_(store_, actor_manager_, [this](const TaskSpecification &spec) {
num_retries_++;
return Status::OK();
}) {}
std::shared_ptr<CoreWorkerMemoryStore> store_;
std::shared_ptr<ActorManagerInterface> actor_manager_;
TaskManager manager_;
int num_retries_ = 0;
};
@@ -41,7 +58,7 @@ TEST_F(TaskManagerTest, TestTaskSuccess) {
return_object->set_object_id(return_id.Binary());
auto data = GenerateRandomBuffer();
return_object->set_data(data->Data(), data->Size());
manager_.CompletePendingTask(spec.TaskId(), reply);
manager_.CompletePendingTask(spec.TaskId(), reply, nullptr);
ASSERT_FALSE(manager_.IsTaskPending(spec.TaskId()));
std::vector<std::shared_ptr<RayObject>> results;
@@ -21,7 +21,6 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
resolver_.ResolveDependencies(task_spec, [this, send_pos, task_spec]() mutable {
const auto &actor_id = task_spec.ActorId();
const auto task_id = task_spec.TaskId();
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
@@ -31,26 +30,20 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
absl::MutexLock lock(&mu_);
auto iter = actor_states_.find(actor_id);
if (iter == actor_states_.end() ||
iter->second.state_ == ActorTableData::RECONSTRUCTING) {
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
auto it = rpc_clients_.find(actor_id);
if (it == rpc_clients_.end()) {
// Actor is not yet created, or is being reconstructed, cache the request
// and submit after actor is alive.
// TODO(zhijunfu): it might be possible for a user to specify an invalid
// actor handle (e.g. from unpickling), in that case it might be desirable
// to have a timeout to mark it as invalid if it doesn't show up in the
// specified time.
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
RAY_LOG(DEBUG) << "Actor " << actor_id << " is not yet created.";
} else if (iter->second.state_ == ActorTableData::ALIVE) {
auto inserted = pending_requests_[actor_id].emplace(send_pos, std::move(request));
RAY_CHECK(inserted.second);
SendPendingTasks(actor_id);
} else {
// Actor is dead, treat the task as failure.
RAY_CHECK(iter->second.state_ == ActorTableData::DEAD);
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED);
SendPendingTasks(actor_id);
}
});
@@ -59,29 +52,30 @@ Status CoreWorkerDirectActorTaskSubmitter::SubmitTask(TaskSpecification task_spe
return Status::OK();
}
void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate(
const ActorID &actor_id, const ActorTableData &actor_data) {
void CoreWorkerDirectActorTaskSubmitter::ConnectActor(const ActorID &actor_id,
const rpc::Address &address) {
absl::MutexLock lock(&mu_);
actor_states_.erase(actor_id);
actor_states_.emplace(
actor_id, ActorStateData(actor_data.state(), actor_data.address().ip_address(),
actor_data.address().port()));
// Create a new connection to the actor.
if (rpc_clients_.count(actor_id) == 0) {
rpc::WorkerAddress addr = {address.ip_address(), address.port()};
rpc_clients_[actor_id] =
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(addr));
}
if (pending_requests_.count(actor_id) > 0) {
SendPendingTasks(actor_id);
}
}
if (actor_data.state() == ActorTableData::ALIVE) {
// Create a new connection to the actor.
if (rpc_clients_.count(actor_id) == 0) {
rpc::WorkerAddress addr = {actor_data.address().ip_address(),
actor_data.address().port()};
rpc_clients_[actor_id] =
std::shared_ptr<rpc::CoreWorkerClientInterface>(client_factory_(addr));
}
if (pending_requests_.count(actor_id) > 0) {
SendPendingTasks(actor_id);
}
} else {
// Remove rpc client if it's dead or being reconstructed.
void CoreWorkerDirectActorTaskSubmitter::DisconnectActor(const ActorID &actor_id,
bool dead) {
absl::MutexLock lock(&mu_);
if (!dead) {
// We're reconstructing the actor, so erase the client for now. The new client
// will be inserted once actor reconstruction completes. We don't erase the
// client when the actor is DEAD, so that all further tasks will be failed.
rpc_clients_.erase(actor_id);
} else {
RAY_LOG(INFO) << "Failing pending tasks for actor " << actor_id;
// If there are pending requests, treat the pending tasks as failed.
auto pending_it = pending_requests_.find(actor_id);
if (pending_it != pending_requests_.end()) {
@@ -90,16 +84,15 @@ void CoreWorkerDirectActorTaskSubmitter::HandleActorUpdate(
auto request = std::move(head->second);
head = pending_it->second.erase(head);
auto task_id = TaskID::FromBinary(request->task_spec().task_id());
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED);
auto status = Status::IOError("cancelling all pending tasks of dead actor");
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status);
}
pending_requests_.erase(pending_it);
}
next_send_position_.erase(actor_id);
next_send_position_to_assign_.erase(actor_id);
// No need to clean up tasks that have been sent and are waiting for
// replies. They will be treated as failed once the connection dies.
// We retain the sequencing information so that we can properly fail
// any tasks submitted after the actor death.
}
}
@@ -129,9 +122,9 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(
std::move(request),
[this, task_id](Status status, const rpc::PushTaskReply &reply) {
if (!status.ok()) {
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED);
task_finisher_->PendingTaskFailed(task_id, rpc::ErrorType::ACTOR_DIED, &status);
} else {
task_finisher_->CompletePendingTask(task_id, reply);
task_finisher_->CompletePendingTask(task_id, reply, nullptr);
}
}));
}
@@ -139,20 +132,16 @@ void CoreWorkerDirectActorTaskSubmitter::PushActorTask(
bool CoreWorkerDirectActorTaskSubmitter::IsActorAlive(const ActorID &actor_id) const {
absl::MutexLock lock(&mu_);
auto iter = actor_states_.find(actor_id);
return (iter != actor_states_.end() && iter->second.state_ == ActorTableData::ALIVE);
auto iter = rpc_clients_.find(actor_id);
return (iter != rpc_clients_.end());
}
CoreWorkerDirectTaskReceiver::CoreWorkerDirectTaskReceiver(
WorkerContext &worker_context, boost::asio::io_service &main_io_service,
const TaskHandler &task_handler, const std::function<void()> &exit_handler)
: worker_context_(worker_context),
task_handler_(task_handler),
exit_handler_(exit_handler),
task_main_io_service_(main_io_service) {}
void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client) {
void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client,
rpc::ClientFactoryFn client_factory,
rpc::Address rpc_address) {
waiter_.reset(new DependencyWaiterImpl(raylet_client));
rpc_address_ = rpc_address;
client_factory_ = client_factory;
}
void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) {
@@ -192,7 +181,7 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
rpc::SendReplyCallback send_reply_callback) {
RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use";
const TaskSpecification task_spec(request.task_spec());
RAY_LOG(DEBUG) << "Received task " << task_spec.TaskId();
RAY_LOG(DEBUG) << "Received task " << task_spec.DebugString();
if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) {
send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."),
nullptr, nullptr);
@@ -211,15 +200,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
}
auto it = scheduling_queue_.find(task_spec.CallerId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.CallerId(),
std::unique_ptr<SchedulingQueue>(new SchedulingQueue(
task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_rate_limiter_)));
it = result.first;
}
// Only assign resources for non-actor tasks. Actor tasks inherit the resources
// assigned at initial actor creation time.
std::shared_ptr<ResourceMappingType> resource_ids;
@@ -289,10 +269,25 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
};
// Run actor creation task immediately on the main thread, without going
// through a scheduling queue.
if (task_spec.IsActorCreationTask()) {
accept_callback();
return;
}
auto reject_callback = [send_reply_callback]() {
send_reply_callback(Status::Invalid("client cancelled stale rpc"), nullptr, nullptr);
};
auto it = scheduling_queue_.find(task_spec.CallerId());
if (it == scheduling_queue_.end()) {
auto result = scheduling_queue_.emplace(
task_spec.CallerId(),
std::unique_ptr<SchedulingQueue>(new SchedulingQueue(
task_main_io_service_, *waiter_, pool_, is_asyncio_, fiber_rate_limiter_)));
it = result.first;
}
it->second->Add(request.sequence_number(), request.client_processed_up_to(),
accept_callback, reject_callback, dependencies);
}
@@ -32,18 +32,6 @@ const int kMaxReorderWaitSeconds = 30;
/// In direct actor call task submitter and receiver, a task is directly submitted
/// to the actor that will execute it.
/// The state data for an actor.
struct ActorStateData {
ActorStateData(gcs::ActorTableData::ActorState state, const std::string &ip, int port)
: state_(state), location_(std::make_pair(ip, port)) {}
/// Actor's state (e.g. alive, dead, reconstrucing).
gcs::ActorTableData::ActorState state_;
/// IP address and port that the actor is listening on.
std::pair<std::string, int> location_;
};
// This class is thread-safe.
class CoreWorkerDirectActorTaskSubmitter {
public:
@@ -60,11 +48,16 @@ class CoreWorkerDirectActorTaskSubmitter {
/// \return Status::Invalid if the task is not yet supported.
Status SubmitTask(TaskSpecification task_spec);
/// Handle an update about an actor.
/// Create connection to actor and send all pending tasks.
///
/// \param[in] actor_id The ID of the actor whose status has changed.
/// \param[in] actor_data The actor's new status information.
void HandleActorUpdate(const ActorID &actor_id, const gcs::ActorTableData &actor_data);
/// \param[in] actor_id Actor ID.
/// \param[in] address The new address of the actor.
void ConnectActor(const ActorID &actor_id, const rpc::Address &address);
/// Disconnect from a failed actor.
///
/// \param[in] actor_id Actor ID.
void DisconnectActor(const ActorID &actor_id, bool dead = false);
private:
/// Push a task to a remote actor via the given client.
@@ -102,9 +95,6 @@ class CoreWorkerDirectActorTaskSubmitter {
/// Mutex to proect the various maps below.
mutable absl::Mutex mu_;
/// Map from actor id to actor state. This only includes actors that we send tasks to.
absl::flat_hash_map<ActorID, ActorStateData> actor_states_ GUARDED_BY(mu_);
/// Map from actor id to rpc client. This only includes actors that we send tasks to.
/// We use shared_ptr to enable shared_from_this for pending client callbacks.
///
@@ -429,7 +419,11 @@ class CoreWorkerDirectTaskReceiver {
CoreWorkerDirectTaskReceiver(WorkerContext &worker_context,
boost::asio::io_service &main_io_service,
const TaskHandler &task_handler,
const std::function<void()> &exit_handler);
const std::function<void()> &exit_handler)
: worker_context_(worker_context),
task_handler_(task_handler),
exit_handler_(exit_handler),
task_main_io_service_(main_io_service) {}
~CoreWorkerDirectTaskReceiver() {
fiber_shutdown_event_.Notify();
@@ -437,7 +431,8 @@ class CoreWorkerDirectTaskReceiver {
}
/// Initialize this receiver. This must be called prior to use.
void Init(raylet::RayletClient &client);
void Init(raylet::RayletClient &client, rpc::ClientFactoryFn client_factory,
rpc::Address rpc_address);
/// Handle a `PushTask` request.
///
@@ -471,6 +466,10 @@ class CoreWorkerDirectTaskReceiver {
std::function<void()> exit_handler_;
/// The IO event loop for running tasks on.
boost::asio::io_service &task_main_io_service_;
/// Factory for producing new core worker clients.
rpc::ClientFactoryFn client_factory_;
/// Address of our RPC server.
rpc::Address rpc_address_;
/// Shared waiter for dependencies required by incoming tasks.
std::unique_ptr<DependencyWaiterImpl> waiter_;
/// Queue of pending requests per actor handle.
@@ -42,6 +42,7 @@ void CoreWorkerDirectTaskSubmitter::OnWorkerIdle(
auto lease_entry = worker_to_lease_client_[addr];
auto queue_entry = task_queues_.find(scheduling_key);
// Return the worker if there was an error executing the previous task,
// the previous task is an actor creation task,
// there are no more applicable queued tasks, or the lease is expired.
if (was_error || queue_entry == task_queues_.end() ||
current_time_ms() > lease_entry.second) {
@@ -115,10 +116,12 @@ void CoreWorkerDirectTaskSubmitter::RequestNewWorkerIfNeeded(
RAY_LOG(DEBUG) << "Lease granted " << task_id;
rpc::WorkerAddress addr = {
reply.worker_address().ip_address(), reply.worker_address().port(),
WorkerID::FromBinary(reply.worker_address().worker_id())};
WorkerID::FromBinary(reply.worker_address().worker_id()),
ClientID::FromBinary(reply.worker_address().raylet_id())};
AddWorkerLeaseClient(addr, std::move(lease_client));
auto resources_copy = reply.resource_mapping();
OnWorkerIdle(addr, scheduling_key, /*error=*/false, resources_copy);
OnWorkerIdle(addr, scheduling_key,
/*error=*/false, resources_copy);
} else {
// The raylet redirected us to a different raylet to retry at.
RequestNewWorkerIfNeeded(scheduling_key, &reply.retry_at_raylet_address());
@@ -158,6 +161,8 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
auto task_id = task_spec.TaskId();
auto request = std::unique_ptr<rpc::PushTaskRequest>(new rpc::PushTaskRequest);
bool is_actor = task_spec.IsActorTask();
bool is_actor_creation = task_spec.IsActorCreationTask();
RAY_LOG(DEBUG) << "Pushing normal task " << task_spec.TaskId();
// NOTE(swang): CopyFrom is needed because if we use Swap here and the task
// fails, then the task data will be gone when the TaskManager attempts to
@@ -166,9 +171,10 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
request->mutable_resource_mapping()->CopyFrom(assigned_resources);
auto status = client.PushNormalTask(
std::move(request),
[this, task_id, is_actor, scheduling_key, addr, assigned_resources](
Status status, const rpc::PushTaskReply &reply) {
{
[this, task_id, is_actor, is_actor_creation, scheduling_key, addr,
assigned_resources](Status status, const rpc::PushTaskReply &reply) {
// Successful actor creation leases the worker indefinitely from the raylet.
if (!status.ok() || !is_actor_creation) {
absl::MutexLock lock(&mu_);
OnWorkerIdle(addr, scheduling_key, /*error=*/!status.ok(), assigned_resources);
}
@@ -177,11 +183,13 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
// failure (e.g., by contacting the raylet). If it was a process
// failure, it may have been an application-level error and it may
// not make sense to retry the task.
task_finisher_->PendingTaskFailed(task_id, is_actor
? rpc::ErrorType::ACTOR_DIED
: rpc::ErrorType::WORKER_DIED);
task_finisher_->PendingTaskFailed(
task_id,
is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED,
&status);
} else {
task_finisher_->CompletePendingTask(task_id, reply);
rpc::Address proto = addr.ToProto();
task_finisher_->CompletePendingTask(task_id, reply, &proto);
}
});
if (!status.ok()) {
@@ -191,7 +199,8 @@ void CoreWorkerDirectTaskSubmitter::PushNormalTask(
OnWorkerIdle(addr, scheduling_key, /*error=*/true, assigned_resources);
}
task_finisher_->PendingTaskFailed(
task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED);
task_id, is_actor ? rpc::ErrorType::ACTOR_DIED : rpc::ErrorType::WORKER_DIED,
&status);
}
}
}; // namespace ray
+25
View File
@@ -7,6 +7,31 @@ namespace ray {
namespace gcs {
std::shared_ptr<gcs::ActorTableData> CreateActorTableData(
const TaskSpecification &task_spec, const rpc::Address &address,
gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_info_ptr = std::make_shared<ray::gcs::ActorTableData>();
// Set all of the static fields for the actor. These fields will not change
// even if the actor fails or is reconstructed.
actor_info_ptr->set_actor_id(actor_id.Binary());
actor_info_ptr->set_parent_id(task_spec.CallerId().Binary());
actor_info_ptr->set_actor_creation_dummy_object_id(
task_spec.ActorDummyObject().Binary());
actor_info_ptr->set_job_id(task_spec.JobId().Binary());
actor_info_ptr->set_max_reconstructions(task_spec.MaxActorReconstructions());
actor_info_ptr->set_is_detached(task_spec.IsDetachedActor());
// Set the fields that change when the actor is restarted.
actor_info_ptr->set_remaining_reconstructions(remaining_reconstructions);
actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall());
actor_info_ptr->mutable_address()->CopyFrom(address);
actor_info_ptr->mutable_owner_address()->CopyFrom(
task_spec.GetMessage().caller_address());
actor_info_ptr->set_state(state);
return actor_info_ptr;
}
RedisActorInfoAccessor::RedisActorInfoAccessor(RedisGcsClient *client_impl)
: client_impl_(client_impl), actor_sub_executor_(client_impl_->actor_table()) {}
+5
View File
@@ -2,6 +2,7 @@
#define RAY_GCS_REDIS_ACTOR_INFO_ACCESSOR_H
#include "ray/common/id.h"
#include "ray/common/task/task_spec.h"
#include "ray/gcs/actor_info_accessor.h"
#include "ray/gcs/callback.h"
#include "ray/gcs/subscription_executor.h"
@@ -11,6 +12,10 @@ namespace ray {
namespace gcs {
std::shared_ptr<gcs::ActorTableData> CreateActorTableData(
const TaskSpecification &task_spec, const rpc::Address &address,
gcs::ActorTableData::ActorState state, uint64_t remaining_reconstructions);
class RedisGcsClient;
/// \class RedisActorInfoAccessor
+3
View File
@@ -126,6 +126,7 @@ Status RedisGcsClient::Connect(boost::asio::io_service &io_service) {
Attach(io_service);
actor_table_.reset(new ActorTable({primary_context_}, this));
direct_actor_table_.reset(new DirectActorTable({primary_context_}, this));
// TODO(micafan) Modify ClientTable' Constructor(remove ClientID) in future.
// We will use NodeID instead of ClientID.
@@ -200,6 +201,8 @@ raylet::TaskTable &RedisGcsClient::raylet_task_table() { return *raylet_task_tab
ActorTable &RedisGcsClient::actor_table() { return *actor_table_; }
DirectActorTable &RedisGcsClient::direct_actor_table() { return *direct_actor_table_; }
TaskReconstructionLog &RedisGcsClient::task_reconstruction_log() {
return *task_reconstruction_log_;
}
+5
View File
@@ -66,6 +66,10 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
ActorCheckpointTable &actor_checkpoint_table();
ActorCheckpointIdTable &actor_checkpoint_id_table();
DynamicResourceTable &resource_table();
/// Used only for direct calls. Tasks submitted through the raylet transport
/// should use Actors(), which has a requirement on the order in which
/// entries can be appended to the log.
DirectActorTable &direct_actor_table();
// We also need something to export generic code to run on workers from the
// driver (to set the PYTHONPATH)
@@ -100,6 +104,7 @@ class RAY_EXPORT RedisGcsClient : public GcsClient {
std::unique_ptr<ObjectTable> object_table_;
std::unique_ptr<raylet::TaskTable> raylet_task_table_;
std::unique_ptr<ActorTable> actor_table_;
std::unique_ptr<DirectActorTable> direct_actor_table_;
std::unique_ptr<TaskReconstructionLog> task_reconstruction_log_;
std::unique_ptr<TaskLeaseTable> task_lease_table_;
std::unique_ptr<HeartbeatTable> heartbeat_table_;
+1
View File
@@ -187,6 +187,7 @@ Status SubscriptionExecutor<ID, Data, Table>::AsyncUnsubscribe(
}
template class SubscriptionExecutor<ActorID, ActorTableData, ActorTable>;
template class SubscriptionExecutor<ActorID, ActorTableData, DirectActorTable>;
template class SubscriptionExecutor<JobID, JobTableData, JobTable>;
} // namespace gcs
+10
View File
@@ -671,6 +671,16 @@ class ActorTable : public Log<ActorID, ActorTableData> {
}
};
class DirectActorTable : public Log<ActorID, ActorTableData> {
public:
DirectActorTable(const std::vector<std::shared_ptr<RedisContext>> &contexts,
RedisGcsClient *client)
: Log(contexts, client) {
pubsub_channel_ = TablePubsub::DIRECT_ACTOR_PUBSUB;
prefix_ = TablePrefix::DIRECT_ACTOR;
}
};
class TaskReconstructionLog : public Log<TaskID, TaskReconstructionData> {
public:
TaskReconstructionLog(const std::vector<std::shared_ptr<RedisContext>> &contexts,
+2
View File
@@ -96,7 +96,9 @@ message DirectActorCallArgWaitCompleteReply {
}
message GetObjectStatusRequest {
// The owner of the object.
bytes owner_id = 1;
// Wait for this object's status.
bytes object_id = 2;
}
+6 -4
View File
@@ -26,7 +26,8 @@ enum TablePrefix {
ACTOR_CHECKPOINT = 15;
ACTOR_CHECKPOINT_ID = 16;
NODE_RESOURCE = 17;
TABLE_PREFIX_MAX = 18;
DIRECT_ACTOR = 18;
TABLE_PREFIX_MAX = 19;
}
// The channel that Add operations to the Table should be published on, if any.
@@ -44,7 +45,8 @@ enum TablePubsub {
TASK_LEASE_PUBSUB = 10;
JOB_PUBSUB = 11;
NODE_RESOURCE_PUBSUB = 12;
TABLE_PUBSUB_MAX = 13;
DIRECT_ACTOR_PUBSUB = 13;
TABLE_PUBSUB_MAX = 14;
}
enum GcsChangeMode {
@@ -89,8 +91,8 @@ message ActorTableData {
}
// The ID of the actor that was created.
bytes actor_id = 1;
// The ID of the actor that created it, null if created by non-actor task.
bytes parent_actor_id = 2;
// The ID of the caller of the actor creation task.
bytes parent_id = 2;
// The dummy object ID returned by the actor creation task. If the actor
// dies, then this is the object that should be reconstructed for the actor
// to be recreated.
+3 -2
View File
@@ -2321,7 +2321,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
worker.ResetTaskResourceIds();
const auto &spec = task.GetTaskSpecification();
if (spec.IsActorCreationTask() || spec.IsActorTask()) {
if ((spec.IsActorCreationTask() || spec.IsActorTask()) && !spec.IsDirectCall()) {
// If this was an actor or actor creation task, handle the actor's new
// state.
FinishAssignedActorTask(worker, task);
@@ -2406,6 +2406,7 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
}
void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
RAY_LOG(INFO) << "Finishing assigned actor task";
ActorID actor_id;
TaskID caller_id;
const TaskSpecification task_spec = task.GetTaskSpecification();
@@ -2508,7 +2509,7 @@ void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id
// Notify the other node managers that the actor has been created.
const ActorID actor_id = task_spec.ActorCreationId();
auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port);
new_actor_info->set_parent_actor_id(parent_actor_id.Binary());
new_actor_info->set_parent_id(parent_actor_id.Binary());
auto update_callback = [actor_id](Status status) {
if (!status.ok()) {
// Only one node at a time should succeed at creating or updating the actor.
+14 -2
View File
@@ -42,12 +42,21 @@ class WorkerAddress {
public:
template <typename H>
friend H AbslHashValue(H h, const WorkerAddress &w) {
return H::combine(std::move(h), w.ip_address, w.port, w.worker_id);
return H::combine(std::move(h), w.ip_address, w.port, w.worker_id, w.raylet_id);
}
bool operator==(const WorkerAddress &other) const {
return other.ip_address == ip_address && other.port == port &&
other.worker_id == worker_id;
other.worker_id == worker_id && other.raylet_id == raylet_id;
}
rpc::Address ToProto() const {
rpc::Address addr;
addr.set_raylet_id(raylet_id.Binary());
addr.set_ip_address(ip_address);
addr.set_port(port);
addr.set_worker_id(worker_id.Binary());
return addr;
}
/// The ip address of the worker.
@@ -56,6 +65,8 @@ class WorkerAddress {
const int port;
/// The unique id of the worker.
const WorkerID worker_id;
/// The unique id of the worker raylet.
const ClientID raylet_id;
};
typedef std::function<std::shared_ptr<CoreWorkerClientInterface>(const WorkerAddress &)>
@@ -184,6 +195,7 @@ class CoreWorkerClient : public std::enable_shared_from_this<CoreWorkerClient>,
*stub_, &CoreWorkerService::Stub::PrepareAsyncGetObjectStatus, request, callback);
return call->GetStatus();
}
/// Send as many pending tasks as possible. This method is thread-safe.
///
/// The client will guarantee no more than kMaxBytesInFlight bytes of RPCs are being