diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2995a7155..8280995f9 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -828,7 +828,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions( - num_return_vals, True, c_resources) + num_return_vals, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, args, &args_vector) @@ -870,7 +870,7 @@ cdef class CoreWorker: check_status(self.core_worker.get().CreateActor( ray_function, args_vector, CActorCreationOptions( - max_reconstructions, True, max_concurrency, + max_reconstructions, max_concurrency, c_resources, c_placement_resources, dynamic_worker_options, is_detached, is_asyncio), extension_data, @@ -897,7 +897,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): if num_method_cpus > 0: c_resources[b"CPU"] = num_method_cpus - task_options = CTaskOptions(num_return_vals, False, c_resources) + task_options = CTaskOptions(num_return_vals, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, args, &args_vector) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index cfbb72d11..da38c83b6 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -217,13 +217,13 @@ cdef extern from "ray/core_worker/common.h" nogil: cdef cppclass CTaskOptions "ray::TaskOptions": CTaskOptions() - CTaskOptions(int num_returns, c_bool is_direct_call, + CTaskOptions(int num_returns, unordered_map[c_string, double] &resources) cdef cppclass CActorCreationOptions "ray::ActorCreationOptions": CActorCreationOptions() CActorCreationOptions( - uint64_t max_reconstructions, c_bool is_direct_call, + uint64_t max_reconstructions, int32_t max_concurrency, const unordered_map[c_string, double] &resources, const unordered_map[c_string, double] &placement_resources, diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc index 015be2c91..1907bfef2 100644 --- a/src/ray/core_worker/actor_handle.cc +++ b/src/ray/core_worker/actor_handle.cc @@ -21,7 +21,7 @@ namespace { ray::rpc::ActorHandle CreateInnerActorHandle( const class ActorID &actor_id, const TaskID &owner_id, const ray::rpc::Address &owner_address, const class JobID &job_id, - const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, + const ObjectID &initial_cursor, const Language actor_language, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data) { ray::rpc::ActorHandle inner; @@ -33,7 +33,6 @@ ray::rpc::ActorHandle CreateInnerActorHandle( *inner.mutable_actor_creation_task_function_descriptor() = actor_creation_task_function_descriptor->GetMessage(); inner.set_actor_cursor(initial_cursor.Binary()); - inner.set_is_direct_call(is_direct_call); inner.set_extension_data(extension_data); return inner; } @@ -51,25 +50,23 @@ namespace ray { ActorHandle::ActorHandle( const class ActorID &actor_id, const TaskID &owner_id, const rpc::Address &owner_address, const class JobID &job_id, - const ObjectID &initial_cursor, const Language actor_language, bool is_direct_call, + const ObjectID &initial_cursor, const Language actor_language, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data) : ActorHandle(CreateInnerActorHandle( actor_id, owner_id, owner_address, job_id, initial_cursor, actor_language, - is_direct_call, actor_creation_task_function_descriptor, extension_data)) {} + actor_creation_task_function_descriptor, extension_data)) {} ActorHandle::ActorHandle(const std::string &serialized) : ActorHandle(CreateInnerActorHandleFromString(serialized)) {} -void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, - const TaskTransportType transport_type, - const ObjectID new_cursor) { +void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor) { absl::MutexLock guard(&mutex_); // Build actor task spec. const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID()); - const ObjectID actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1, - /*transport_type=*/static_cast(transport_type)); + const ObjectID actor_creation_dummy_object_id = ObjectID::ForTaskReturn( + actor_creation_task_id, /*index=*/1, + /*transport_type=*/static_cast(TaskTransportType::DIRECT)); builder.SetActorTaskSpec(GetActorID(), actor_creation_dummy_object_id, /*previous_actor_task_dummy_object_id=*/actor_cursor_, task_counter_++); diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h index ab0a02f5b..5989aff6e 100644 --- a/src/ray/core_worker/actor_handle.h +++ b/src/ray/core_worker/actor_handle.h @@ -35,7 +35,6 @@ class ActorHandle { ActorHandle(const ActorID &actor_id, const TaskID &owner_id, const rpc::Address &owner_address, const JobID &job_id, const ObjectID &initial_cursor, const Language actor_language, - bool is_direct_call, const ray::FunctionDescriptor &actor_creation_task_function_descriptor, const std::string &extension_data); @@ -61,10 +60,7 @@ class ActorHandle { std::string ExtensionData() const { return inner_.extension_data(); } - bool IsDirectCallActor() const { return inner_.is_direct_call(); } - - void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, - const ObjectID new_cursor); + void SetActorTaskSpec(TaskSpecBuilder &builder, const ObjectID new_cursor); void Serialize(std::string *output); diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 1309b10f1..40fc0278b 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -99,14 +99,11 @@ class TaskArg { /// Options for all tasks (actor and non-actor) except for actor creation. struct TaskOptions { TaskOptions() {} - TaskOptions(int num_returns, bool is_direct_call, - std::unordered_map &resources) - : num_returns(num_returns), is_direct_call(is_direct_call), resources(resources) {} + TaskOptions(int num_returns, std::unordered_map &resources) + : num_returns(num_returns), resources(resources) {} /// Number of returns of this task. int num_returns = 1; - /// Whether to use the direct task transport. - bool is_direct_call = false; /// Resources required by this task. std::unordered_map resources; }; @@ -114,14 +111,12 @@ struct TaskOptions { /// Options for actor creation tasks. struct ActorCreationOptions { ActorCreationOptions() {} - ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, - int max_concurrency, + ActorCreationOptions(uint64_t max_reconstructions, int max_concurrency, const std::unordered_map &resources, const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options, bool is_detached, bool is_asyncio) : max_reconstructions(max_reconstructions), - is_direct_call(is_direct_call), max_concurrency(max_concurrency), resources(resources), placement_resources(placement_resources), @@ -132,9 +127,6 @@ struct ActorCreationOptions { /// Maximum number of times that the actor should be reconstructed when it dies /// unexpectedly. It must be non-negative. If it's 0, the actor won't be reconstructed. const uint64_t max_reconstructions = 0; - /// Whether to use direct actor call. If this is set to true, callers will submit - /// tasks directly to the created actor without going through raylet. - const bool is_direct_call = false; /// The max number of concurrent tasks to run on this direct call actor. const int max_concurrency = 1; /// Resources required by the whole lifetime of this actor. @@ -147,7 +139,7 @@ struct ActorCreationOptions { /// Whether to keep the actor persistent after driver exit. If true, this will set /// the worker to not be destroyed after the driver shutdown. const bool is_detached = false; - /// Whether to use async mode of direct actor call. is_direct_call must be true. + /// Whether to use async mode of direct actor call. const bool is_asyncio = false; }; diff --git a/src/ray/core_worker/context.cc b/src/ray/core_worker/context.cc index c88b51dfb..5b00b60df 100644 --- a/src/ray/core_worker/context.cc +++ b/src/ray/core_worker/context.cc @@ -147,6 +147,7 @@ bool WorkerContext::ShouldReleaseResourcesOnBlockingCalls() const { CurrentThreadIsMain(); } +// TODO(edoakes): simplify these checks now that we only support direct call mode. bool WorkerContext::CurrentActorIsDirectCall() const { return current_actor_is_direct_call_; } diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index e59232e34..5dea9ca3d 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -35,13 +35,13 @@ void BuildCommonTaskSpec( const std::vector &args, uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, - ray::TaskTransportType transport_type, std::vector *return_ids) { + std::vector *return_ids) { // Build common task spec. builder.SetCommonTaskSpec(task_id, function.GetLanguage(), function.GetFunctionDescriptor(), job_id, current_task_id, task_index, caller_id, address, num_returns, - transport_type == ray::TaskTransportType::DIRECT, - required_resources, required_placement_resources); + /*is_direct_transport_type=*/true, required_resources, + required_placement_resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -54,9 +54,8 @@ void BuildCommonTaskSpec( // Compute return IDs. return_ids->resize(num_returns); for (size_t i = 0; i < num_returns; i++) { - (*return_ids)[i] = - ObjectID::ForTaskReturn(task_id, i + 1, - /*transport_type=*/static_cast(transport_type)); + (*return_ids)[i] = ObjectID::ForTaskReturn( + task_id, i + 1, static_cast(ray::TaskTransportType::DIRECT)); } } @@ -799,21 +798,14 @@ Status CoreWorker::SubmitTask(const RayFunction &function, const std::unordered_map required_resources; // TODO(ekl) offload task building onto a thread pool for performance - BuildCommonTaskSpec( - builder, worker_context_.GetCurrentJobID(), task_id, - worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, - function, args, task_options.num_returns, task_options.resources, - required_resources, - task_options.is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET, - return_ids); + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), + rpc_address_, function, args, task_options.num_returns, + task_options.resources, required_resources, return_ids); TaskSpecification task_spec = builder.Build(); - if (task_options.is_direct_call) { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite(), max_retries); - return direct_task_submitter_->SubmitTask(task_spec); - } else { - return local_raylet_client_->SubmitTask(task_spec); - } + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), + max_retries); + return direct_task_submitter_->SubmitTask(task_spec); } Status CoreWorker::CreateActor(const RayFunction &function, @@ -832,33 +824,24 @@ Status CoreWorker::CreateActor(const RayFunction &function, BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, 1, actor_creation_options.resources, - actor_creation_options.placement_resources, - actor_creation_options.is_direct_call ? TaskTransportType::DIRECT - : TaskTransportType::RAYLET, - &return_ids); + actor_creation_options.placement_resources, &return_ids); builder.SetActorCreationTaskSpec( actor_id, actor_creation_options.max_reconstructions, actor_creation_options.dynamic_worker_options, - actor_creation_options.is_direct_call, actor_creation_options.max_concurrency, + /*is_direct_call=*/true, actor_creation_options.max_concurrency, actor_creation_options.is_detached, actor_creation_options.is_asyncio); *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); - Status status; - if (actor_creation_options.is_direct_call) { - task_manager_->AddPendingTask( - GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), - std::max(RayConfig::instance().actor_creation_min_retries(), - actor_creation_options.max_reconstructions)); - status = direct_task_submitter_->SubmitTask(task_spec); - } else { - status = local_raylet_client_->SubmitTask(task_spec); - } + task_manager_->AddPendingTask( + GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), + std::max(RayConfig::instance().actor_creation_min_retries(), + actor_creation_options.max_reconstructions)); + Status status = direct_task_submitter_->SubmitTask(task_spec); std::unique_ptr actor_handle(new ActorHandle( actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], - function.GetLanguage(), actor_creation_options.is_direct_call, - function.GetFunctionDescriptor(), extension_data)); + function.GetLanguage(), function.GetFunctionDescriptor(), extension_data)); RAY_CHECK(AddActorHandle(std::move(actor_handle), /*is_owner_handle=*/!actor_creation_options.is_detached)) << "Actor " << actor_id << " already exists"; @@ -875,10 +858,6 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; - const bool is_direct_call = actor_handle->IsDirectCallActor(); - const TaskTransportType transport_type = - is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET; - // Build common task spec. TaskSpecBuilder builder; const int next_task_index = worker_context_.GetNextTaskIndex(); @@ -889,28 +868,24 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f BuildCommonTaskSpec(builder, actor_handle->CreationJobID(), actor_task_id, worker_context_.GetCurrentTaskID(), next_task_index, GetCallerId(), rpc_address_, function, args, num_returns, task_options.resources, - required_resources, transport_type, return_ids); + required_resources, return_ids); const ObjectID new_cursor = return_ids->back(); - actor_handle->SetActorTaskSpec(builder, transport_type, new_cursor); + actor_handle->SetActorTaskSpec(builder, new_cursor); // Remove cursor from return ids. return_ids->pop_back(); // Submit task. Status status; TaskSpecification task_spec = builder.Build(); - if (is_direct_call) { - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite()); - if (actor_handle->IsDead()) { - auto status = Status::IOError("sent task to dead actor"); - task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, - &status); - } else { - status = direct_actor_submitter_->SubmitTask(task_spec); - } + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, + CurrentCallSite()); + if (actor_handle->IsDead()) { + auto status = Status::IOError("sent task to dead actor"); + task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, + &status); } else { - RAY_CHECK_OK(local_raylet_client_->SubmitTask(task_spec)); + status = direct_actor_submitter_->SubmitTask(task_spec); } return status; } @@ -919,7 +894,6 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_reconstruction) { ActorHandle *actor_handle = nullptr; RAY_RETURN_NOT_OK(GetActorHandle(actor_id, &actor_handle)); - RAY_CHECK(actor_handle->IsDirectCallActor()); direct_actor_submitter_->KillActor(actor_id, force_kill, no_reconstruction); return Status::OK(); } @@ -976,14 +950,9 @@ bool CoreWorker::AddActorHandle(std::unique_ptr actor_handle, absl::MutexLock lock(&actor_handles_mutex_); auto it = actor_handles_.find(actor_id); RAY_CHECK(it != actor_handles_.end()); - if (it->second->IsDirectCallActor()) { - // We have to reset the actor handle since the next instance of the - // actor will not have the last sequence number that we sent. - // TODO: Remove the check for direct calls. We do not reset for the - // raylet codepath because it tries to replay all tasks since the - // last actor checkpoint. - it->second->Reset(); - } + // We have to reset the actor handle since the next instance of the + // actor will not have the last sequence number that we sent. + it->second->Reset(); direct_actor_submitter_->DisconnectActor(actor_id, false); } else if (actor_data.state() == gcs::ActorTableData::DEAD) { direct_actor_submitter_->DisconnectActor(actor_id, true); @@ -1073,9 +1042,8 @@ Status CoreWorker::AllocateReturnObjects( } // Allocate a buffer for the return object. - if (worker_context_.CurrentTaskIsDirectCall() && - static_cast(data_sizes[i]) < - RayConfig::instance().max_direct_call_object_size()) { + if (static_cast(data_sizes[i]) < + RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); } else { RAY_RETURN_NOT_OK( @@ -1128,12 +1096,9 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, reference_counter_->AddLocalReference(borrowed_id, task_spec.CallSiteString()); } - const auto transport_type = worker_context_.CurrentTaskIsDirectCall() - ? TaskTransportType::DIRECT - : TaskTransportType::RAYLET; std::vector return_ids; for (size_t i = 0; i < task_spec.NumReturns(); i++) { - return_ids.push_back(task_spec.ReturnId(i, transport_type)); + return_ids.push_back(task_spec.ReturnId(i, TaskTransportType::DIRECT)); } Status status; @@ -1173,11 +1138,6 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to seal object " << return_ids[i] << " in store: " << status.message(); } - } else if (!worker_context_.CurrentTaskIsDirectCall()) { - if (!Put(*return_objects->at(i), {}, return_ids[i]).ok()) { - RAY_LOG(FATAL) << "Task " << task_spec.TaskId() << " failed to put object " - << return_ids[i] << " in store: " << status.message(); - } } } @@ -1305,16 +1265,10 @@ void CoreWorker::HandleAssignTask(const rpc::AssignTaskRequest &request, return; } - if (worker_context_.CurrentActorIsDirectCall()) { - send_reply_callback(Status::Invalid("This actor only accepts direct calls."), nullptr, - nullptr); - return; - } else { - task_queue_length_ += 1; - task_execution_service_.post([=] { - raylet_task_receiver_->HandleAssignTask(request, reply, send_reply_callback); - }); - } + task_queue_length_ += 1; + task_execution_service_.post([=] { + raylet_task_receiver_->HandleAssignTask(request, reply, send_reply_callback); + }); } void CoreWorker::HandlePushTask(const rpc::PushTaskRequest &request, diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc index f03ca920c..7116b6684 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc @@ -85,7 +85,7 @@ inline ray::TaskOptions ToTaskOptions(JNIEnv *env, jint numReturns, jobject call resources = ToResources(env, java_resources); } - ray::TaskOptions task_options{numReturns, /*use_direct_call=*/true, resources}; + ray::TaskOptions task_options{numReturns, resources}; return task_options; } @@ -113,7 +113,6 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, ray::ActorCreationOptions actor_creation_options{ static_cast(max_reconstructions), - /*use_direct_call=*/true, static_cast(max_concurrency), resources, resources, diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index ab769508d..a921e1bfd 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -59,7 +59,7 @@ static void flushall_redis(void) { ActorID CreateActorHelper(CoreWorker &worker, std::unordered_map &resources, - bool is_direct_call, uint64_t max_reconstructions) { + uint64_t max_reconstructions) { std::unique_ptr actor_handle; uint8_t array[] = {1, 2, 3}; @@ -71,8 +71,8 @@ ActorID CreateActorHelper(CoreWorker &worker, args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer, nullptr, std::vector()))); - ActorCreationOptions actor_options{max_reconstructions, is_direct_call, - /*max_concurrency*/ 1, resources, resources, {}, + ActorCreationOptions actor_options{max_reconstructions, + /*max_concurrency*/ 1, resources, resources, {}, /*is_detached*/ false, /*is_asyncio*/ false}; @@ -258,20 +258,17 @@ class CoreWorkerTest : public ::testing::Test { void TestNormalTask(std::unordered_map &resources); // Test actor tasks. - void TestActorTask(std::unordered_map &resources, - bool is_direct_call); + void TestActorTask(std::unordered_map &resources); // Test actor failure case, verify that the tasks would either succeed or // fail with exceptions, in that case the return objects fetched from `Get` // contain errors. - void TestActorFailure(std::unordered_map &resources, - bool is_direct_call); + void TestActorFailure(std::unordered_map &resources); // Test actor failover case. Verify that actor can be reconstructed successfully, // and as long as we wait for actor reconstruction before submitting new tasks, // it is guaranteed that all tasks are successfully completed. - void TestActorReconstruction(std::unordered_map &resources, - bool is_direct_call); + void TestActorReconstruction(std::unordered_map &resources); protected: bool WaitForDirectCallActorState(CoreWorker &worker, const ActorID &actor_id, @@ -279,8 +276,7 @@ class CoreWorkerTest : public ::testing::Test { // Get the pid for the worker process that runs the actor. int GetActorPid(CoreWorker &worker, const ActorID &actor_id, - std::unordered_map &resources, - bool is_direct_call); + std::unordered_map &resources); std::vector raylet_socket_names_; std::vector raylet_store_socket_names_; @@ -301,10 +297,9 @@ bool CoreWorkerTest::WaitForDirectCallActorState(CoreWorker &worker, } int CoreWorkerTest::GetActorPid(CoreWorker &worker, const ActorID &actor_id, - std::unordered_map &resources, - bool is_direct_call) { + std::unordered_map &resources) { std::vector args; - TaskOptions options{1, is_direct_call, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func{Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "GetWorkerPid", "", "", "")}; @@ -348,8 +343,6 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); TaskOptions options; - options.is_direct_call = true; - std::vector return_ids; RAY_CHECK_OK( driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0)); @@ -370,13 +363,12 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res } } -void CoreWorkerTest::TestActorTask(std::unordered_map &resources, - bool is_direct_call) { +void CoreWorkerTest::TestActorTask(std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", node_manager_port, nullptr); - auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto actor_id = CreateActorHelper(driver, resources, 1000); // Test submitting some tasks with by-value args for that actor. { @@ -392,7 +384,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -401,7 +393,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso ASSERT_EQ(return_ids.size(), 1); ASSERT_TRUE(return_ids[0].IsReturnObject()); ASSERT_EQ(static_cast(return_ids[0].GetTransportType()), - is_direct_call ? TaskTransportType::DIRECT : TaskTransportType::RAYLET); + TaskTransportType::DIRECT); std::vector> results; RAY_CHECK_OK(driver.Get(return_ids, -1, &results)); @@ -437,7 +429,7 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer2, nullptr, std::vector()))); - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -459,19 +451,19 @@ void CoreWorkerTest::TestActorTask(std::unordered_map &reso } void CoreWorkerTest::TestActorReconstruction( - std::unordered_map &resources, bool is_direct_call) { + std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", node_manager_port, nullptr); // creating actor. - auto actor_id = CreateActorHelper(driver, resources, is_direct_call, 1000); + auto actor_id = CreateActorHelper(driver, resources, 1000); // Wait for actor alive event. ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been created"; - auto pid = GetActorPid(driver, actor_id, resources, is_direct_call); + auto pid = GetActorPid(driver, actor_id, resources); RAY_CHECK(pid != -1); // Test submitting some tasks with by-value args for that actor. @@ -485,9 +477,9 @@ void CoreWorkerTest::TestActorReconstruction( ASSERT_EQ(system("pkill mock_worker"), 0); // Wait for actor restruction event, and then for alive event. - auto check_actor_restart_func = [this, pid, &driver, &actor_id, &resources, - is_direct_call]() -> bool { - auto new_pid = GetActorPid(driver, actor_id, resources, is_direct_call); + auto check_actor_restart_func = [this, pid, &driver, &actor_id, + &resources]() -> bool { + auto new_pid = GetActorPid(driver, actor_id, resources); return new_pid != -1 && new_pid != pid; }; ASSERT_TRUE(WaitForCondition(check_actor_restart_func, 30 * 1000 /* 30s */)); @@ -503,7 +495,7 @@ void CoreWorkerTest::TestActorReconstruction( args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -520,15 +512,14 @@ void CoreWorkerTest::TestActorReconstruction( } } -void CoreWorkerTest::TestActorFailure(std::unordered_map &resources, - bool is_direct_call) { +void CoreWorkerTest::TestActorFailure( + std::unordered_map &resources) { CoreWorker driver(WorkerType::DRIVER, Language::PYTHON, raylet_store_socket_names_[0], raylet_socket_names_[0], NextJobId(), gcs_options_, "", "127.0.0.1", node_manager_port, nullptr); // creating actor. - auto actor_id = - CreateActorHelper(driver, resources, is_direct_call, 0 /* not reconstructable */); + auto actor_id = CreateActorHelper(driver, resources, 0 /* not reconstructable */); // Test submitting some tasks with by-value args for that actor. { @@ -549,7 +540,7 @@ void CoreWorkerTest::TestActorFailure(std::unordered_map &r args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer1, nullptr, std::vector()))); - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -626,7 +617,6 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { std::unordered_map resources; ActorCreationOptions actor_options{0, - /*is_direct_call*/ true, 1, resources, resources, @@ -636,8 +626,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - function.GetLanguage(), true, function.GetFunctionDescriptor(), - ""); + function.GetLanguage(), function.GetFunctionDescriptor(), ""); // Manually create `num_tasks` task specs, and for each of them create a // `PushTaskRequest`, this is to batch performance of TaskSpec @@ -647,7 +636,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { RAY_LOG(INFO) << "start creating " << num_tasks << " PushTaskRequests"; rpc::Address address; for (int i = 0; i < num_tasks; i++) { - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; auto num_returns = options.num_returns; @@ -665,8 +654,7 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { } } - actor_handle.SetActorTaskSpec(builder, TaskTransportType::RAYLET, - ObjectID::FromRandom()); + actor_handle.SetActorTaskSpec(builder, ObjectID::FromRandom()); auto task_spec = builder.Build(); @@ -686,7 +674,6 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { // Create an actor. std::unordered_map resources; auto actor_id = CreateActorHelper(driver, resources, - /*is_direct_call=*/true, /*max_reconstructions=*/0); // wait for actor creation finish. ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_id, true, 30 * 1000 /* 30s */)); @@ -703,7 +690,7 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer, nullptr, std::vector()))); - TaskOptions options{1, false, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func(ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "MergeInputArgsAsOutput", "", "", "")); @@ -753,7 +740,7 @@ TEST_F(ZeroNodeTest, TestActorHandle) { JobID job_id = NextJobId(); ActorHandle original(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 0), TaskID::Nil(), rpc::Address(), job_id, ObjectID::FromRandom(), - Language::PYTHON, /*is_direct_call=*/false, + Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("", "", "", ""), ""); std::string output; original.Serialize(&output); @@ -982,46 +969,35 @@ TEST_F(TwoNodeTest, TestNormalTaskCrossNodes) { TEST_F(SingleNodeTest, TestActorTaskLocal) { std::unordered_map resources; - TestActorTask(resources, false); + TestActorTask(resources); } TEST_F(TwoNodeTest, TestActorTaskCrossNodes) { std::unordered_map resources; resources.emplace("resource1", 1); - TestActorTask(resources, false); + TestActorTask(resources); } -TEST_F(SingleNodeTest, TestDirectActorTaskLocal) { +TEST_F(SingleNodeTest, TestActorTaskLocalReconstruction) { std::unordered_map resources; - TestActorTask(resources, true); + TestActorReconstruction(resources); } -TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodes) { +TEST_F(TwoNodeTest, TestActorTaskCrossNodesReconstruction) { std::unordered_map resources; resources.emplace("resource1", 1); - TestActorTask(resources, true); + TestActorReconstruction(resources); } -TEST_F(SingleNodeTest, TestDirectActorTaskLocalReconstruction) { +TEST_F(SingleNodeTest, TestActorTaskLocalFailure) { std::unordered_map resources; - TestActorReconstruction(resources, true); + TestActorFailure(resources); } -TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesReconstruction) { +TEST_F(TwoNodeTest, TestActorTaskCrossNodesFailure) { std::unordered_map resources; resources.emplace("resource1", 1); - TestActorReconstruction(resources, true); -} - -TEST_F(SingleNodeTest, TestDirectActorTaskLocalFailure) { - std::unordered_map resources; - TestActorFailure(resources, true); -} - -TEST_F(TwoNodeTest, TestDirectActorTaskCrossNodesFailure) { - std::unordered_map resources; - resources.emplace("resource1", 1); - TestActorFailure(resources, true); + TestActorFailure(resources); } } // namespace ray diff --git a/src/ray/core_worker/transport/direct_actor_transport.cc b/src/ray/core_worker/transport/direct_actor_transport.cc index 75ca0e0f8..b424f9294 100644 --- a/src/ray/core_worker/transport/direct_actor_transport.cc +++ b/src/ray/core_worker/transport/direct_actor_transport.cc @@ -219,12 +219,6 @@ void CoreWorkerDirectTaskReceiver::HandlePushTask( rpc::SendReplyCallback send_reply_callback) { RAY_CHECK(waiter_ != nullptr) << "Must call init() prior to use"; const TaskSpecification task_spec(request.task_spec()); - if (task_spec.IsActorTask() && !worker_context_.CurrentTaskIsDirectCall()) { - send_reply_callback(Status::Invalid("This actor doesn't accept direct calls."), - nullptr, nullptr); - return; - } - std::vector dependencies; for (size_t i = 0; i < task_spec.NumArgs(); ++i) { int count = task_spec.ArgIdCount(i); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index d9a3a2fd5..8c927b5c9 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -48,11 +48,8 @@ message ActorHandle { // TODO: Remove this once scheduling is done by task counter only. bytes actor_cursor = 7; - // Whether direct actor call is used. - bool is_direct_call = 8; - // An extension field that is used for storing app-language-specific data. - bytes extension_data = 9; + bytes extension_data = 8; } message AssignTaskRequest { @@ -101,7 +98,6 @@ message PushTaskRequest { // will guarantee tasks execute in this sequence, waiting for any // out-of-order request messages to arrive as necessary. // If set to -1, ordering is disabled and the task executes immediately. - // This mode of behaviour is used for direct task submission only. int64 sequence_number = 4; // The max sequence number the client has processed responses for. This // is a performance optimization that allows the client to tell the server diff --git a/streaming/src/queue/transport.cc b/streaming/src/queue/transport.cc index d764a5f09..95f5c11d4 100644 --- a/streaming/src/queue/transport.cc +++ b/streaming/src/queue/transport.cc @@ -11,7 +11,7 @@ void Transport::SendInternal(std::shared_ptr buffer, RayFunction &function, int return_num, std::vector &return_ids) { std::unordered_map resources; - TaskOptions options{return_num, true, resources}; + TaskOptions options{return_num, resources}; char meta_data[3] = {'R', 'A', 'W'}; std::shared_ptr meta = diff --git a/streaming/src/test/queue_tests_base.h b/streaming/src/test/queue_tests_base.h index e0839ccd7..34e5455dd 100644 --- a/streaming/src/test/queue_tests_base.h +++ b/streaming/src/test/queue_tests_base.h @@ -72,9 +72,8 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { } std::string StartStore() { - std::string store_socket_name = ray::JoinPaths( - ray::GetUserTempDir(), - "store" + RandomObjectID().Hex()); + std::string store_socket_name = + ray::JoinPaths(ray::GetUserTempDir(), "store" + RandomObjectID().Hex()); std::string store_pid = store_socket_name + ".pid"; std::string plasma_command = store_executable_ + " -m 10000000 -s " + store_socket_name + @@ -96,8 +95,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { std::string StartGcsServer(std::string redis_address) { std::string gcs_server_socket_name = ray::JoinPaths( - ray::GetUserTempDir(), - "gcs_server" + ObjectID::FromRandom().Hex()); + ray::GetUserTempDir(), "gcs_server" + ObjectID::FromRandom().Hex()); std::string ray_start_cmd = gcs_server_executable_; ray_start_cmd.append(" --redis_address=" + redis_address) .append(" --redis_port=6379") @@ -122,9 +120,8 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { std::string StartRaylet(std::string store_socket_name, std::string node_ip_address, int port, std::string redis_address, std::string resource) { - std::string raylet_socket_name = ray::JoinPaths( - ray::GetUserTempDir(), - "raylet" + RandomObjectID().Hex()); + std::string raylet_socket_name = + ray::JoinPaths(ray::GetUserTempDir(), "raylet" + RandomObjectID().Hex()); std::string ray_start_cmd = raylet_executable_; ray_start_cmd.append(" --raylet_socket_name=" + raylet_socket_name) .append(" --store_socket_name=" + store_socket_name) @@ -174,7 +171,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(TaskArg::PassByValue(std::make_shared( msg.ToBytes(), nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, true, resources}; + TaskOptions options{0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython("init", "", "", "")}; @@ -189,7 +186,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{0, true, resources}; + TaskOptions options{0, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "execute_test", test, "", "")}; @@ -204,7 +201,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { args.emplace_back(TaskArg::PassByValue( std::make_shared(buffer, nullptr, std::vector(), true))); std::unordered_map resources; - TaskOptions options{1, true, resources}; + TaskOptions options{1, resources}; std::vector return_ids; RayFunction func{ray::Language::PYTHON, ray::FunctionDescriptorBuilder::BuildPython( "check_current_test_status", "", "", "")}; @@ -274,7 +271,7 @@ class StreamingQueueTestBase : public ::testing::TestWithParam { std::make_shared(buffer, nullptr, std::vector()))); ActorCreationOptions actor_options{ - max_reconstructions, is_direct_call, + max_reconstructions, /*max_concurrency*/ 1, resources, resources, {}, /*is_detached*/ false, /*is_asyncio*/ false};