diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index a4c65f97f..2f37e48f0 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -600,7 +600,7 @@ class NodeStats(threading.Thread): "jobId": ray.utils.binary_to_hex( actor_data.job_id), "state": actor_data.state, - "isDirectCall": actor_data.is_direct_call, + "isDirectCall": True, "timestamp": actor_data.timestamp } else: diff --git a/python/ray/state.py b/python/ray/state.py index 453530bbf..5985d0561 100644 --- a/python/ray/state.py +++ b/python/ray/state.py @@ -333,7 +333,7 @@ class GlobalState: "IPAddress": actor_table_data.owner_address.ip_address, "Port": actor_table_data.owner_address.port }, - "IsDirectCall": actor_table_data.is_direct_call, + "IsDirectCall": True, "State": actor_table_data.state, "Timestamp": actor_table_data.timestamp, } diff --git a/src/ray/common/task/task_spec.cc b/src/ray/common/task/task_spec.cc index 79b6c0ab3..176cf55c5 100644 --- a/src/ray/common/task/task_spec.cc +++ b/src/ray/common/task/task_spec.cc @@ -221,16 +221,6 @@ ObjectID TaskSpecification::ActorDummyObject() const { return ReturnId(NumReturns() - 1, TaskTransportType::RAYLET); } -bool TaskSpecification::IsDirectCall() const { return message_->is_direct_call(); } - -bool TaskSpecification::IsDirectActorCreationCall() const { - if (IsActorCreationTask()) { - return message_->actor_creation_task_spec().is_direct_call(); - } else { - return false; - } -} - int TaskSpecification::MaxActorConcurrency() const { RAY_CHECK(IsActorCreationTask()); return message_->actor_creation_task_spec().max_concurrency(); @@ -261,7 +251,6 @@ std::string TaskSpecification::DebugString() const { // Print actor creation task spec. stream << ", actor_creation_task_spec={actor_id=" << ActorCreationId() << ", max_reconstructions=" << MaxActorReconstructions() - << ", is_direct_call=" << IsDirectCall() << ", max_concurrency=" << MaxActorConcurrency() << ", is_asyncio_actor=" << IsAsyncioActor() << ", is_detached=" << IsDetachedActor() << "}"; diff --git a/src/ray/common/task/task_spec.h b/src/ray/common/task/task_spec.h index a4f26ef61..94ebae426 100644 --- a/src/ray/common/task/task_spec.h +++ b/src/ray/common/task/task_spec.h @@ -159,8 +159,6 @@ class TaskSpecification : public MessageWrapper { bool IsDirectCall() const; - bool IsDirectActorCreationCall() const; - int MaxActorConcurrency() const; bool IsAsyncioActor() const; diff --git a/src/ray/common/task/task_util.h b/src/ray/common/task/task_util.h index db3f8aa13..b1100e6bc 100644 --- a/src/ray/common/task/task_util.h +++ b/src/ray/common/task/task_util.h @@ -27,7 +27,7 @@ class TaskSpecBuilder { const TaskID &task_id, const Language &language, const ray::FunctionDescriptor &function_descriptor, const JobID &job_id, const TaskID &parent_task_id, uint64_t parent_counter, const TaskID &caller_id, - const rpc::Address &caller_address, uint64_t num_returns, bool is_direct_call, + const rpc::Address &caller_address, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources) { message_->set_type(TaskType::NORMAL_TASK); @@ -40,7 +40,6 @@ class TaskSpecBuilder { message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); message_->set_num_returns(num_returns); - message_->set_is_direct_call(is_direct_call); message_->mutable_required_resources()->insert(required_resources.begin(), required_resources.end()); message_->mutable_required_placement_resources()->insert( @@ -65,7 +64,6 @@ class TaskSpecBuilder { message_->set_caller_id(caller_id.Binary()); message_->mutable_caller_address()->CopyFrom(caller_address); message_->set_num_returns(0); - message_->set_is_direct_call(false); return *this; } @@ -105,8 +103,7 @@ class TaskSpecBuilder { TaskSpecBuilder &SetActorCreationTaskSpec( const ActorID &actor_id, uint64_t max_reconstructions = 0, const std::vector &dynamic_worker_options = {}, - bool is_direct_call = false, int max_concurrency = 1, bool is_detached = false, - bool is_asyncio = false) { + int max_concurrency = 1, bool is_detached = false, bool is_asyncio = false) { message_->set_type(TaskType::ACTOR_CREATION_TASK); auto actor_creation_spec = message_->mutable_actor_creation_task_spec(); actor_creation_spec->set_actor_id(actor_id.Binary()); @@ -114,7 +111,6 @@ class TaskSpecBuilder { for (const auto &option : dynamic_worker_options) { actor_creation_spec->add_dynamic_worker_options(option); } - actor_creation_spec->set_is_direct_call(is_direct_call); actor_creation_spec->set_max_concurrency(max_concurrency); actor_creation_spec->set_is_asyncio(is_asyncio); actor_creation_spec->set_is_detached(is_detached); diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index 5b00b60df..cd4998e8c 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -103,13 +103,13 @@ void WorkerContext::SetCurrentTask(const TaskSpecification &task_spec) { if (task_spec.IsNormalTask()) { RAY_CHECK(current_job_id_.IsNil()); SetCurrentJobId(task_spec.JobId()); - current_task_is_direct_call_ = task_spec.IsDirectCall(); + current_task_is_direct_call_ = true; } else if (task_spec.IsActorCreationTask()) { RAY_CHECK(current_job_id_.IsNil()); SetCurrentJobId(task_spec.JobId()); RAY_CHECK(current_actor_id_.IsNil()); current_actor_id_ = task_spec.ActorCreationId(); - current_actor_is_direct_call_ = task_spec.IsDirectActorCreationCall(); + current_actor_is_direct_call_ = true; current_actor_max_concurrency_ = task_spec.MaxActorConcurrency(); current_actor_is_asyncio_ = task_spec.IsAsyncioActor(); } else if (task_spec.IsActorTask()) { diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 2a01d2ef8..3c07fe5d7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -40,8 +40,7 @@ void BuildCommonTaskSpec( builder.SetCommonTaskSpec(task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, - /*is_direct_transport_type=*/true, required_resources, - required_placement_resources); + required_resources, required_placement_resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -827,11 +826,11 @@ Status CoreWorker::CreateActor(const RayFunction &function, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, actor_creation_options.resources, actor_creation_options.placement_resources, &return_ids); - builder.SetActorCreationTaskSpec( - actor_id, actor_creation_options.max_reconstructions, - actor_creation_options.dynamic_worker_options, - /*is_direct_call=*/true, actor_creation_options.max_concurrency, - actor_creation_options.is_detached, actor_creation_options.is_asyncio); + builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, + actor_creation_options.dynamic_worker_options, + actor_creation_options.max_concurrency, + actor_creation_options.is_detached, + actor_creation_options.is_asyncio); *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index a921e1bfd..402f4e23d 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -643,8 +643,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { TaskSpecBuilder builder; builder.SetCommonTaskSpec(RandomTaskId(), function.GetLanguage(), function.GetFunctionDescriptor(), job_id, RandomTaskId(), 0, - RandomTaskId(), address, num_returns, /*is_direct*/ false, - resources, resources); + RandomTaskId(), address, num_returns, resources, resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { diff --git a/src/ray/core_worker/test/direct_task_transport_test.cc b/src/ray/core_worker/test/direct_task_transport_test.cc index 259a2c20e..6c3502a82 100644 --- a/src/ray/core_worker/test/direct_task_transport_test.cc +++ b/src/ray/core_worker/test/direct_task_transport_test.cc @@ -293,7 +293,7 @@ TaskSpecification BuildTaskSpec(const std::unordered_map &r rpc::Address empty_address; builder.SetCommonTaskSpec(TaskID::Nil(), Language::PYTHON, function_descriptor, JobID::Nil(), TaskID::Nil(), 0, TaskID::Nil(), empty_address, - 1, true, resources, resources); + 1, resources, resources); return builder.Build(); } diff --git a/src/ray/gcs/pb_util.h b/src/ray/gcs/pb_util.h index d568db9a0..a930e4bc3 100644 --- a/src/ray/gcs/pb_util.h +++ b/src/ray/gcs/pb_util.h @@ -74,7 +74,6 @@ inline std::shared_ptr CreateActorTableData( actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); // Set the fields that change when the actor is restarted. actor_info_ptr->set_remaining_reconstructions(remaining_reconstructions); - actor_info_ptr->set_is_direct_call(task_spec.IsDirectCall()); actor_info_ptr->mutable_address()->CopyFrom(address); actor_info_ptr->mutable_owner_address()->CopyFrom( task_spec.GetMessage().caller_address()); diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index 97797b6c8..22ae0db7d 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -113,10 +113,8 @@ message TaskSpec { // Task specification for an actor task. // This field is only valid when `type == ACTOR_TASK`. ActorTaskSpec actor_task_spec = 15; - // Whether this task is a direct call task. - bool is_direct_call = 16; // Number of times this task may be retried on worker failure. - int32 max_retries = 17; + int32 max_retries = 16; } // Argument in the task. @@ -146,14 +144,12 @@ message ActorCreationTaskSpec { // the placeholder strings (`RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_0`, // `RAY_WORKER_DYNAMIC_OPTION_PLACEHOLDER_1`, etc) in the worker command. repeated string dynamic_worker_options = 4; - // Whether direct actor call is used. - bool is_direct_call = 5; // The max number of concurrent calls for direct call actors. - int32 max_concurrency = 6; + int32 max_concurrency = 5; // Whether the actor is persistent - bool is_detached = 7; + bool is_detached = 6; // Whether the actor use async actor calls - bool is_asyncio = 8; + bool is_asyncio = 7; } // Task spec of an actor task. diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index a829eb678..0eb56098c 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -127,12 +127,10 @@ message ActorTableData { Address address = 9; // The address of the the actor's owner (parent). Address owner_address = 10; - // Whether direct actor call is used. - bool is_direct_call = 11; // Whether the actor is persistent. - bool is_detached = 12; + bool is_detached = 11; // Timestamp that the actor is created or reconstructed. - double timestamp = 13; + double timestamp = 12; } message ErrorTableData { diff --git a/src/ray/raylet/lineage_cache.cc b/src/ray/raylet/lineage_cache.cc index fbf1de800..f99700318 100644 --- a/src/ray/raylet/lineage_cache.cc +++ b/src/ray/raylet/lineage_cache.cc @@ -174,15 +174,13 @@ LineageCache::LineageCache(const ClientID &self_node_id, /// A helper function to add some uncommitted lineage to the local cache. void LineageCache::AddUncommittedLineage(const TaskID &task_id, const Lineage &uncommitted_lineage) { + // TODO(edoakes): remove this method, it's currently only called in unit tests. RAY_LOG(DEBUG) << "Adding uncommitted task " << task_id << " on " << self_node_id_; // If the entry is not found in the lineage to merge, then we stop since // there is nothing to copy into the merged lineage. auto entry = uncommitted_lineage.GetEntry(task_id); if (!entry) { return; - } else if (entry->TaskData().GetTaskSpecification().IsDirectCall()) { - // Disable lineage logging for direct tasks. - return; } RAY_CHECK(entry->GetStatus() == GcsStatus::UNCOMMITTED); @@ -200,10 +198,7 @@ void LineageCache::AddUncommittedLineage(const TaskID &task_id, } bool LineageCache::CommitTask(const Task &task) { - if (task.GetTaskSpecification().IsDirectCall()) { - // Disable lineage logging for direct tasks. - return true; - } + // TODO(edoakes): remove this method, it's currently only called in unit tests. const TaskID task_id = task.GetTaskSpecification().TaskId(); RAY_LOG(DEBUG) << "Committing task " << task_id << " on " << self_node_id_; diff --git a/src/ray/raylet/lineage_cache_test.cc b/src/ray/raylet/lineage_cache_test.cc index f380c5378..013095742 100644 --- a/src/ray/raylet/lineage_cache_test.cc +++ b/src/ray/raylet/lineage_cache_test.cc @@ -197,7 +197,7 @@ static inline Task ExampleTask(const std::vector &arguments, builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, - num_returns, false, {}, {}); + num_returns, {}, {}); for (const auto &arg : arguments) { builder.AddByRefArg(arg); } diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7ca684766..35b901b2d 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1439,18 +1439,8 @@ void NodeManager::ProcessPrepareActorCheckpointRequest( std::shared_ptr worker = worker_pool_.GetRegisteredWorker(client); RAY_CHECK(worker && worker->GetActorId() == actor_id); - std::shared_ptr checkpoint_data; - if (actor_entry->second.GetTableData().is_direct_call()) { - checkpoint_data = - actor_entry->second.GenerateCheckpointData(actor_entry->first, nullptr); - } else { - // Find the task that is running on this actor. - const auto task_id = worker->GetAssignedTaskId(); - const Task &task = local_queues_.GetTaskOfState(task_id, TaskState::RUNNING); - // Generate checkpoint data. - checkpoint_data = - actor_entry->second.GenerateCheckpointData(actor_entry->first, &task); - } + std::shared_ptr checkpoint_data = + actor_entry->second.GenerateCheckpointData(actor_entry->first, nullptr); // Write checkpoint data to GCS. RAY_CHECK_OK(gcs_client_->Actors().AsyncAddCheckpoint( @@ -2005,17 +1995,6 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag return; } - // Add the task and its uncommitted lineage to the lineage cache. - if (forwarded) { - lineage_cache_.AddUncommittedLineage(task_id, uncommitted_lineage); - } else { - if (!lineage_cache_.CommitTask(task)) { - RAY_LOG(WARNING) - << "Task " << task_id - << " already committed to the GCS. This is most likely due to reconstruction."; - } - } - if (spec.IsActorTask()) { // Check whether we know the location of the actor. const auto actor_entry = actor_registry_.find(spec.ActorId()); @@ -2464,7 +2443,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { if (!spec.IsActorCreationTask() && !spec.IsActorTask()) { worker.AssignJobId(JobID::Nil()); } - if (!spec.IsDirectActorCreationCall()) { + if (!spec.IsActorCreationTask()) { // Unset the worker's assigned task. We keep the assigned task ID for // direct actor creation calls because this ID is used later if the actor // requires objects from plasma. @@ -2473,7 +2452,7 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { } // Direct actors will be assigned tasks via the core worker and therefore are // not idle. - return !spec.IsDirectActorCreationCall(); + return !spec.IsActorCreationTask(); } std::shared_ptr NodeManager::CreateActorTableDataFromCreationTask( @@ -2498,7 +2477,6 @@ std::shared_ptr NodeManager::CreateActorTableDataFromCreationTas // This is the first time that the actor has been created, so the number // of remaining reconstructions is the max. actor_info_ptr->set_remaining_reconstructions(task_spec.MaxActorReconstructions()); - actor_info_ptr->set_is_direct_call(task_spec.IsDirectActorCreationCall()); actor_info_ptr->set_is_detached(task_spec.IsDetachedActor()); actor_info_ptr->mutable_owner_address()->CopyFrom( task_spec.GetMessage().caller_address()); @@ -2807,12 +2785,12 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { } } -bool NodeManager::IsDirectActorCreationTask(const TaskID &task_id) { +bool NodeManager::IsActorCreationTask(const TaskID &task_id) { auto actor_id = task_id.ActorId(); if (!actor_id.IsNil() && task_id == TaskID::ForActorCreationTask(actor_id)) { // This task ID corresponds to an actor creation task. auto iter = actor_registry_.find(actor_id); - if (iter != actor_registry_.end() && iter->second.GetTableData().is_direct_call()) { + if (iter != actor_registry_.end()) { // This actor is direct call actor. return true; } @@ -2854,7 +2832,7 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) { // So here we check for direct actor creation task explicitly to allow this case. auto iter = waiting_task_id_set.begin(); while (iter != waiting_task_id_set.end()) { - if (IsDirectActorCreationTask(*iter)) { + if (IsActorCreationTask(*iter)) { RAY_LOG(DEBUG) << "Ignoring direct actor creation task " << *iter << " when handling object missing for " << object_id; iter = waiting_task_id_set.erase(iter); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 4ad52a777..74a1fecd5 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -617,8 +617,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// Repeat the process as long as we can schedule a task. void NewSchedulerSchedulePendingTasks(); - /// Whether a task is an direct actor creation task. - bool IsDirectActorCreationTask(const TaskID &task_id); + /// Whether a task is an actor creation task. + bool IsActorCreationTask(const TaskID &task_id); /// ID of this node. ClientID self_node_id_; diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index 799fe5132..cefcf3442 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -337,23 +337,21 @@ void TaskDependencyManager::TaskPending(const Task &task) { // and eventually assigned to a worker. In this case we need // the task lease to make sure there's only one raylet can // resubmit the task. - if (task.GetTaskSpecification().IsDirectCall()) { - // We can use `OnDispatch` to differeniate whether this task is - // a worker lease request. - // For direct actor creation task: - // - when it's submitted by core worker, we guarantee that - // we always request a new worker lease, in that case - // `OnDispatch` is overriden to an actual callback. - // - when it's resubmitted by raylet because of reconstruction, - // `OnDispatch` will not be overriden and thus is nullptr. - if (task.GetTaskSpecification().IsActorCreationTask() && - task.OnDispatch() == nullptr) { - // This is an actor creation task, and it's being reconstructed, - // in this case we still need the task lease. Note that we don't - // require task lease for direct actor creation task. - } else { - return; - } + // + // We can use `OnDispatch` to differeniate whether this task is + // a worker lease request. + // For direct actor creation task: + // - when it's submitted by core worker, we guarantee that + // we always request a new worker lease, in that case + // `OnDispatch` is overriden to an actual callback. + // - when it's resubmitted by raylet because of reconstruction, + // `OnDispatch` will not be overriden and thus is nullptr. + if (task.GetTaskSpecification().IsActorCreationTask() && task.OnDispatch() == nullptr) { + // This is an actor creation task, and it's being reconstructed, + // in this case we still need the task lease. Note that we don't + // require task lease for direct actor creation task. + } else { + return; } TaskID task_id = task.GetTaskSpecification().TaskId(); diff --git a/src/ray/raylet/task_dependency_manager_test.cc b/src/ray/raylet/task_dependency_manager_test.cc index f63561c77..aef9915c8 100644 --- a/src/ray/raylet/task_dependency_manager_test.cc +++ b/src/ray/raylet/task_dependency_manager_test.cc @@ -110,7 +110,8 @@ static inline Task ExampleTask(const std::vector &arguments, builder.SetCommonTaskSpec(RandomTaskId(), Language::PYTHON, FunctionDescriptorBuilder::BuildPython("", "", "", ""), JobID::Nil(), RandomTaskId(), 0, RandomTaskId(), address, - num_returns, false, {}, {}); + num_returns, {}, {}); + builder.SetActorCreationTaskSpec(ActorID::Nil(), 1, {}, 1, false, false); for (const auto &arg : arguments) { builder.AddByRefArg(arg); }