Fix actor reconstruction with direct call (#6570)

This commit is contained in:
Zhijun Fu
2019-12-26 10:59:50 +08:00
committed by Hao Chen
parent be23b3ac41
commit d2bba596ab
16 changed files with 140 additions and 84 deletions
@@ -47,9 +47,6 @@ public class ActorReconstructionTest extends BaseTest {
public void testActorReconstruction() throws InterruptedException, IOException {
TestUtils.skipTestUnderSingleProcess();
// TODO (kfstorm): Actor reconstruction is currently not supporeted in direct actor call mode.
// Will re-enable the test once the issue got fixed.
TestUtils.skipTestIfDirectActorCallEnabled();
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
RayActor<Counter> actor = Ray.createActor(Counter::new, options);
@@ -130,10 +127,6 @@ public class ActorReconstructionTest extends BaseTest {
public void testActorCheckpointing() throws IOException, InterruptedException {
TestUtils.skipTestUnderSingleProcess();
// TODO (kfstorm): In direct actor call mode, the actor creation task is not pushed to raylet.
// But to save an actor checkpoint, raylet needs to know about this the actor. Will re-enable
// the test once the issue got fixed.
TestUtils.skipTestIfDirectActorCallEnabled();
ActorCreationOptions options =
new ActorCreationOptions.Builder().setMaxReconstructions(1).createActorCreationOptions();
RayActor<CheckpointableCounter> actor = Ray.createActor(CheckpointableCounter::new, options);
-1
View File
@@ -836,7 +836,6 @@ def test_actor_init_fails(ray_start_cluster_head):
assert results == [1 for actor in actors]
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet")
def test_reconstruction_suppression(ray_start_cluster_head):
cluster = ray_start_cluster_head
num_nodes = 5
+21 -1
View File
@@ -12,6 +12,8 @@ import pytest
import ray
import ray.ray_constants as ray_constants
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
@pytest.mark.parametrize(
"ray_start_cluster", [{
@@ -29,13 +31,16 @@ def test_actor_creation_node_failure(ray_start_cluster):
def __init__(self, death_probability):
self.death_probability = death_probability
def get_probability(self):
return self.death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
num_children = 25
# Children actors will die about half the time.
death_probability = 0.5
@@ -58,6 +63,21 @@ def test_actor_creation_node_failure(ray_start_cluster):
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
if (RAY_FORCE_DIRECT):
children_out = [
child.get_probability.remote() for child in children
]
# Wait for new created actors to finish creation before
# removing a node. This is needed because right now we don't
# support reconstructing actors that died in the process of
# being created.
ready, _ = ray.wait(
children_out,
num_returns=len(children_out),
timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
+7 -1
View File
@@ -125,7 +125,13 @@ bool WorkerContext::CurrentThreadIsMain() const {
}
bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const {
return !CurrentActorIsDirectCall() && CurrentThreadIsMain();
// Check if we need to release resources when we block:
// - Driver doesn't acquire resources and thus doesn't need to release.
// - We only support lifetime resources for direct actors, which can be
// acquired when the actor is created, per call resources are not supported,
// thus we don't need to release resources for direct actor call.
return worker_type_ != WorkerType::DRIVER && !CurrentActorIsDirectCall() &&
CurrentThreadIsMain();
}
bool WorkerContext::CurrentActorIsDirectCall() const {
+18 -15
View File
@@ -101,13 +101,6 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
// Initialize gcs client.
gcs_client_ = std::make_shared<gcs::RedisGcsClient>(gcs_options);
RAY_CHECK_OK(gcs_client_->Connect(io_service_));
direct_actor_table_subscriber_ = std::unique_ptr<
gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>>(
new gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>(
gcs_client_->direct_actor_table()));
actor_manager_ =
std::unique_ptr<ActorManager>(new ActorManager(gcs_client_->direct_actor_table()));
// Initialize profiler.
profiler_ = std::make_shared<worker::Profiler>(worker_context_, node_ip_address,
@@ -194,8 +187,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language,
ref_counting_enabled ? reference_counter_ : nullptr, local_raylet_client_));
task_manager_.reset(new TaskManager(
memory_store_, reference_counter_, actor_manager_,
[this](const TaskSpecification &spec) {
memory_store_, reference_counter_, nullptr, [this](const TaskSpecification &spec) {
// Retry after a delay to emulate the existing Raylet reconstruction
// behaviour. TODO(ekl) backoff exponentially.
RAY_LOG(ERROR) << "Will resubmit task after a 5 second delay: "
@@ -301,8 +293,7 @@ void CoreWorker::SetCurrentTaskId(const TaskID &task_id) {
if (actor_id_.IsNil() && task_id.IsNil()) {
absl::MutexLock lock(&actor_handles_mutex_);
for (const auto &handle : actor_handles_) {
RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncUnsubscribe(
subscribe_id_, handle.first, nullptr));
RAY_CHECK_OK(gcs_client_->Actors().AsyncUnsubscribe(handle.first, nullptr));
}
actor_handles_.clear();
}
@@ -749,8 +740,20 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
// Register a callback to handle actor notifications.
auto actor_notification_callback = [this](const ActorID &actor_id,
const gcs::ActorTableData &actor_data) {
RAY_CHECK(actor_data.state() != gcs::ActorTableData::RECONSTRUCTING);
if (actor_data.state() == gcs::ActorTableData::DEAD) {
if (actor_data.state() == gcs::ActorTableData::RECONSTRUCTING) {
absl::MutexLock lock(&actor_handles_mutex_);
auto it = actor_handles_.find(actor_id);
RAY_CHECK(it != actor_handles_.end());
if (it->second->IsDirectCallActor()) {
// We have to reset the actor handle since the next instance of the
// actor will not have the last sequence number that we sent.
// TODO: Remove the check for direct calls. We do not reset for the
// raylet codepath because it tries to replay all tasks since the
// last actor checkpoint.
it->second->Reset();
}
direct_actor_submitter_->DisconnectActor(actor_id, false);
} else if (actor_data.state() == gcs::ActorTableData::DEAD) {
direct_actor_submitter_->DisconnectActor(actor_id, true);
ActorHandle *actor_handle = nullptr;
@@ -772,8 +775,8 @@ bool CoreWorker::AddActorHandle(std::unique_ptr<ActorHandle> actor_handle) {
<< ClientID::FromBinary(actor_data.address().raylet_id());
};
RAY_CHECK_OK(direct_actor_table_subscriber_->AsyncSubscribe(
subscribe_id_, actor_id, actor_notification_callback, nullptr));
RAY_CHECK_OK(gcs_client_->Actors().AsyncSubscribe(
actor_id, actor_notification_callback, nullptr));
}
return inserted;
}
-14
View File
@@ -586,17 +586,6 @@ class CoreWorker {
// Client to the GCS shared by core worker interfaces.
std::shared_ptr<gcs::RedisGcsClient> gcs_client_;
/// This is temporary fake node id that is used only by
/// `direct_actor_table_subscriber_ `.
/// TODO(micafan): remove `direct_actor_table_subscriber_` and
/// use `GcsClient` for actor subscription.
ClientID subscribe_id_{ClientID::FromRandom()};
// Client to listen to direct actor events.
std::unique_ptr<
gcs::SubscriptionExecutor<ActorID, gcs::ActorTableData, gcs::DirectActorTable>>
direct_actor_table_subscriber_;
// Client to the raylet shared by core worker interfaces. This needs to be a
// shared_ptr for direct calls because we can lease multiple workers through
// one client, and we need to keep the connection alive until we return all
@@ -628,9 +617,6 @@ class CoreWorker {
// Tracks the currently pending tasks.
std::shared_ptr<TaskManager> task_manager_;
// Interface for publishing actor creation.
std::shared_ptr<ActorManager> actor_manager_;
// Interface to submit tasks directly to other actors.
std::unique_ptr<CoreWorkerDirectActorTaskSubmitter> direct_actor_submitter_;
-8
View File
@@ -101,11 +101,6 @@ void TaskManager::CompletePendingTask(const TaskID &task_id,
}
}
if (spec.IsActorCreationTask()) {
RAY_CHECK(actor_addr != nullptr);
actor_manager_->PublishCreatedActor(spec, *actor_addr);
}
ShutdownIfNeeded();
}
@@ -208,9 +203,6 @@ void TaskManager::MarkPendingTaskFailed(const TaskID &task_id,
/*transport_type=*/static_cast<int>(TaskTransportType::DIRECT));
RAY_CHECK_OK(in_memory_store_->Put(RayObject(error_type), object_id));
}
if (spec.IsActorCreationTask()) {
actor_manager_->PublishTerminatedActor(spec);
}
}
TaskSpecification TaskManager::GetTaskSpec(const TaskID &task_id) const {
+9 -11
View File
@@ -993,18 +993,16 @@ TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) {
TestActorTask(resources, true);
}
// TODO(ekl) re-enable once reconstruction is implemented
// TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) {
// std::unordered_map<std::string, double> resources;
// TestActorReconstruction(resources, true);
//}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) {
std::unordered_map<std::string, double> resources;
TestActorReconstruction(resources, true);
}
// TODO(ekl) re-enable once reconstruction is implemented
// TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) {
// std::unordered_map<std::string, double> resources;
// resources.emplace("resource1", 1);
// TestActorReconstruction(resources, true);
//}
TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) {
std::unordered_map<std::string, double> resources;
resources.emplace("resource1", 1);
TestActorReconstruction(resources, true);
}
TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) {
std::unordered_map<std::string, double> resources;
@@ -174,6 +174,7 @@ void CoreWorkerDirectTaskReceiver::Init(raylet::RayletClient &raylet_client,
waiter_.reset(new DependencyWaiterImpl(raylet_client));
rpc_address_ = rpc_address;
client_factory_ = client_factory;
local_raylet_client_ = raylet_client;
}
void CoreWorkerDirectTaskReceiver::SetMaxActorConcurrency(int max_concurrency) {
@@ -283,6 +284,15 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask(
}
}
}
if (task_spec.IsActorCreationTask()) {
RAY_LOG(INFO) << "Actor creation task finished, task_id: " << task_spec.TaskId()
<< ", actor_id: " << task_spec.ActorCreationId();
// Tell raylet that an actor creation task has finished execution, so that
// raylet can publish actor creation event to GCS, and mark this worker as
// actor, thus if this worker dies later raylet will reconstruct the actor.
RAY_CHECK_OK(local_raylet_client_->TaskDone());
}
}
if (status.IsSystemExit()) {
// Don't allow the worker to be reused, even though the reply status is OK.
@@ -509,6 +509,8 @@ class CoreWorkerDirectTaskReceiver {
/// The fiber semaphore used to limit the number of concurrent fibers
/// running at once.
std::shared_ptr<FiberRateLimiter> fiber_rate_limiter_;
boost::optional<raylet::RayletClient &> local_raylet_client_;
};
} // namespace ray
@@ -11,8 +11,9 @@ Status CoreWorkerDirectTaskSubmitter::SubmitTask(TaskSpecification task_spec) {
absl::MutexLock lock(&mu_);
// Note that the dependencies in the task spec are mutated to only contain
// plasma dependencies after ResolveDependencies finishes.
const SchedulingKey scheduling_key(task_spec.GetSchedulingClass(),
task_spec.GetDependencies());
const SchedulingKey scheduling_key(
task_spec.GetSchedulingClass(), task_spec.GetDependencies(),
task_spec.IsActorCreationTask() ? task_spec.ActorCreationId() : ActorID::Nil());
auto it = task_queues_.find(scheduling_key);
if (it == task_queues_.end()) {
it = task_queues_.emplace(scheduling_key, std::deque<TaskSpecification>()).first;
@@ -25,8 +25,12 @@ typedef std::function<std::shared_ptr<WorkerLeaseInterface>(const std::string &i
// (encapsulated in SchedulingClass) to defer resource allocation decisions to the raylet
// and ensure fairness between different tasks, as well as plasma task dependencies as
// a performance optimization because the raylet will fetch plasma dependencies to the
// scheduled worker.
using SchedulingKey = std::pair<SchedulingClass, std::vector<ObjectID>>;
// scheduled worker. It's also keyed on actor ID to ensure the actor creation task
// would always request a new worker lease. We need this to let raylet know about
// direct actor creation task, and reconstruct the actor if it dies. Otherwise if
// the actor creation task just reuses an existing worker, then raylet will not
// be aware of the actor and is not able to manage it.
using SchedulingKey = std::tuple<SchedulingClass, std::vector<ObjectID>, ActorID>;
// This class is thread-safe.
class CoreWorkerDirectTaskSubmitter {
+2 -1
View File
@@ -85,7 +85,8 @@ Status RedisActorInfoAccessor::AsyncUpdate(
// RECONSTRUCTING or DEAD entries have an odd index.
log_length += 1;
}
RAY_LOG(DEBUG) << "AsyncUpdate actor state to " << data_ptr->state()
<< ", actor id: " << actor_id << ", log_length: " << log_length;
auto on_success = [callback](RedisGcsClient *client, const ActorID &actor_id,
const ActorTableData &data) {
// If we successfully appended a record to the GCS table of the actor that
+29 -18
View File
@@ -692,8 +692,6 @@ void NodeManager::HeartbeatBatchAdded(const HeartbeatBatchTableData &heartbeat_b
HeartbeatAdded(client_id, heartbeat_data);
}
RAY_LOG(DEBUG) << "Total active object IDs received: " << active_object_ids.size();
// Refresh the active object IDs in plasma to prevent them from being evicted.
std::vector<plasma::ObjectID> plasma_ids;
plasma_ids.reserve(active_object_ids.size());
@@ -1058,7 +1056,8 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca
if (was_local && !status.ok()) {
// If the disconnected actor was local, only this node will try to update actor
// state. So the update shouldn't fail.
RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id;
RAY_LOG(FATAL) << "Failed to update state for actor " << actor_id
<< ", status: " << status.ToString();
}
};
auto actor_notification = std::make_shared<ActorTableData>(new_actor_info);
@@ -1510,6 +1509,18 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::WorkerLeaseRequest &reques
rpc::Task task_message;
task_message.mutable_task_spec()->CopyFrom(request.resource_spec());
Task task(task_message);
bool is_actor_creation_task = task.GetTaskSpecification().IsActorCreationTask();
ActorID actor_id = ActorID::Nil();
if (is_actor_creation_task) {
actor_id = task.GetTaskSpecification().ActorCreationId();
// Save the actor creation task spec to GCS, which is needed to
// reconstruct the actor when raylet detect it dies.
std::shared_ptr<rpc::TaskTableData> data = std::make_shared<rpc::TaskTableData>();
data->mutable_task()->mutable_task_spec()->CopyFrom(
task.GetTaskSpecification().GetMessage());
RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr));
}
if (new_scheduler_enabled_) {
auto request_resources =
@@ -2204,7 +2215,7 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
}
RAY_LOG(DEBUG) << "Assigning task " << spec.TaskId() << " to worker with pid "
<< worker->Pid();
<< worker->Pid() << ", worker id: " << worker->WorkerId();
flatbuffers::FlatBufferBuilder fbb;
// Resource accounting: acquire resources for the assigned task.
@@ -2235,6 +2246,9 @@ void NodeManager::AssignTask(const std::shared_ptr<Worker> &worker, const Task &
spec.IsActorCreationTask() ? worker->GetLifetimeResourceIds()
: worker->GetTaskResourceIds());
post_assign_callbacks->push_back([this, worker, task_id]() {
RAY_LOG(DEBUG) << "Finished assigning task " << task_id << " to worker "
<< worker->WorkerId();
FinishAssignTask(worker, task_id, /*success=*/true);
});
} else {
@@ -2276,7 +2290,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
worker.ResetTaskResourceIds();
const auto &spec = task.GetTaskSpecification();
if ((spec.IsActorCreationTask() || spec.IsActorTask()) && !spec.IsDirectCall()) {
if ((spec.IsActorCreationTask() || spec.IsActorTask())) {
// If this was an actor or actor creation task, handle the actor's new
// state.
FinishAssignedActorTask(worker, task);
@@ -2305,7 +2319,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) {
}
std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec, int port) {
const TaskSpecification &task_spec, int port, const WorkerID &worker_id) {
RAY_CHECK(task_spec.IsActorCreationTask());
auto actor_id = task_spec.ActorCreationId();
auto actor_entry = actor_registry_.find(actor_id);
@@ -2355,6 +2369,7 @@ std::shared_ptr<ActorTableData> NodeManager::CreateActorTableDataFromCreationTas
gcs_client_->Nodes().GetSelfInfo().node_manager_address());
actor_info_ptr->mutable_address()->set_port(port);
actor_info_ptr->mutable_address()->set_raylet_id(self_node_id_.Binary());
actor_info_ptr->mutable_address()->set_worker_id(worker_id.Binary());
actor_info_ptr->set_state(ActorTableData::ALIVE);
return actor_info_ptr;
}
@@ -2387,11 +2402,12 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
// Lookup the parent actor id.
auto parent_task_id = task_spec.ParentTaskId();
int port = worker.Port();
auto worker_id = worker.WorkerId();
RAY_CHECK_OK(
gcs_client_->Tasks().AsyncGet(
parent_task_id,
/*callback=*/
[this, task_spec, resumed_from_checkpoint, port, parent_task_id](
[this, task_spec, resumed_from_checkpoint, port, parent_task_id, worker_id](
Status status, const boost::optional<TaskTableData> &parent_task_data) {
if (parent_task_data) {
// The task was in the GCS task table. Use the stored task spec to
@@ -2404,7 +2420,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
parent_actor_id = parent_task.GetTaskSpecification().ActorId();
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint, port);
resumed_from_checkpoint, port, worker_id);
return;
}
// The parent task was not in the GCS task table. It should most likely be
@@ -2429,7 +2445,7 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
<< "ray.init(redis_max_memory=<max_memory_bytes>).";
}
FinishAssignedActorCreationTask(parent_actor_id, task_spec,
resumed_from_checkpoint, port);
resumed_from_checkpoint, port, worker_id);
}));
} else {
auto actor_entry = actor_registry_.find(actor_id);
@@ -2456,11 +2472,11 @@ void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) {
void NodeManager::FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint,
int port) {
bool resumed_from_checkpoint, int port,
const WorkerID &worker_id) {
// Notify the other node managers that the actor has been created.
const ActorID actor_id = task_spec.ActorCreationId();
auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port);
auto new_actor_info = CreateActorTableDataFromCreationTask(task_spec, port, worker_id);
new_actor_info->set_parent_id(parent_actor_id.Binary());
auto update_callback = [actor_id](Status status) {
if (!status.ok()) {
@@ -2557,11 +2573,6 @@ void NodeManager::ResubmitTask(const Task &task, const ObjectID &required_object
RAY_LOG(DEBUG) << "Attempting to resubmit task "
<< task.GetTaskSpecification().TaskId();
if (task.GetTaskSpecification().IsDirectCall()) {
TreatTaskAsFailed(task, ErrorType::OBJECT_UNRECONSTRUCTABLE);
return;
}
// Actors should only be recreated if the first initialization failed or if
// the most recent instance of the actor failed.
if (task.GetTaskSpecification().IsActorCreationTask()) {
@@ -2823,6 +2834,7 @@ void NodeManager::ForwardTask(
void NodeManager::FinishAssignTask(const std::shared_ptr<Worker> &worker,
const TaskID &task_id, bool success) {
RAY_LOG(DEBUG) << "FinishAssignTask: " << task_id;
// Remove the ASSIGNED task from the READY queue.
Task assigned_task;
TaskState state;
@@ -2831,7 +2843,6 @@ void NodeManager::FinishAssignTask(const std::shared_ptr<Worker> &worker,
return;
}
RAY_CHECK(state == TaskState::READY);
if (success) {
auto spec = assigned_task.GetTaskSpecification();
// We successfully assigned the task to the worker.
+3 -2
View File
@@ -234,7 +234,7 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// actor.
/// \param worker The port that the actor is listening on.
std::shared_ptr<ActorTableData> CreateActorTableDataFromCreationTask(
const TaskSpecification &task_spec, int port);
const TaskSpecification &task_spec, int port, const WorkerID &worker_id);
/// Handle a worker finishing an assigned actor task or actor creation task.
/// \param worker The worker that finished the task.
/// \param task The actor task or actor creation task.
@@ -252,7 +252,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler {
/// \return Void.
void FinishAssignedActorCreationTask(const ActorID &parent_actor_id,
const TaskSpecification &task_spec,
bool resumed_from_checkpoint, int port);
bool resumed_from_checkpoint, int port,
const WorkerID &worker_id);
/// Make a placement decision for placeable tasks given the resource_map
/// provided. This will perform task state transitions and task forwarding.
///
+30 -1
View File
@@ -310,8 +310,37 @@ std::vector<TaskID> TaskDependencyManager::GetPendingTasks() const {
void TaskDependencyManager::TaskPending(const Task &task) {
// Direct tasks are not tracked by the raylet.
// NOTE(zhijunfu): Direct tasks are not tracked by the raylet,
// but we still need raylet to reconstruct the actors.
// For direct actor creation task:
// - Initially the caller leases a worker from raylet and
// then pushes actor creation task directly to the worker,
// thus it doesn't need task lease. And actually if we
// acquire a lease in this case and forget to cancel it,
// the lease would never expire which will prevent the
// actor from being reconstructed;
// - When a direct actor is reconstructed, raylet resubmits
// the task, and the task can be forwarded to another raylet,
// and eventually assigned to a worker. In this case we need
// the task lease to make sure there's only one raylet can
// resubmit the task.
if (task.GetTaskSpecification().IsDirectCall()) {
return;
// We can use `OnDispatch` to differeniate whether this task is
// a worker lease request.
// For direct actor creation task:
// - when it's submitted by core worker, we guarantee that
// we always request a new worker lease, in that case
// `OnDispatch` is overriden to an actual callback.
// - when it's resubmitted by raylet because of reconstruction,
// `OnDispatch` will not be overriden and thus is nullptr.
if (task.GetTaskSpecification().IsActorCreationTask() &&
task.OnDispatch() == nullptr) {
// This is an actor creation task, and it's being reconstructed,
// in this case we still need the task lease. Note that we don't
// require task lease for direct actor creation task.
} else {
return;
}
}
TaskID task_id = task.GetTaskSpecification().TaskId();