From eefea4e29c458a35db20a0aa4ca02e6970132954 Mon Sep 17 00:00:00 2001 From: Stephanie Wang Date: Mon, 20 Apr 2020 19:13:50 -0700 Subject: [PATCH] [core] Post task submission to IO loop (#8090) * Post to IO loop * Unused * Fix build --- python/ray/_raylet.pyx | 4 +- python/ray/includes/libcoreworker.pxd | 2 +- src/ray/core_worker/core_worker.cc | 42 +++++++++---------- src/ray/core_worker/core_worker.h | 12 +++--- ...io_ray_runtime_task_NativeTaskSubmitter.cc | 8 ++-- src/ray/core_worker/test/core_worker_test.cc | 3 +- 6 files changed, 33 insertions(+), 38 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index e537df4da..f8e46d035 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -868,9 +868,9 @@ cdef class CoreWorker: prepare_args(self, language, args, &args_vector) with nogil: - check_status(CCoreWorkerProcess.GetCoreWorker().SubmitTask( + CCoreWorkerProcess.GetCoreWorker().SubmitTask( ray_function, args_vector, task_options, &return_ids, - max_retries)) + max_retries) return VectorToObjectIDs(return_ids) diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 477678997..e4759095a 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -82,7 +82,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CWorkerType &GetWorkerType() CLanguage &GetLanguage() - CRayStatus SubmitTask( + void SubmitTask( const CRayFunction &function, const c_vector[CTaskArg] &args, const CTaskOptions &options, c_vector[CObjectID] *return_ids, int max_retries) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index bb7c7ea51..3c4042a0b 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1086,10 +1086,9 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca return local_raylet_client_->SetResource(resource_name, capacity, client_id); } -Status CoreWorker::SubmitTask(const RayFunction &function, - const std::vector &args, - const TaskOptions &task_options, - std::vector *return_ids, int max_retries) { +void CoreWorker::SubmitTask(const RayFunction &function, const std::vector &args, + const TaskOptions &task_options, + std::vector *return_ids, int max_retries) { TaskSpecBuilder builder; const int next_task_index = worker_context_.GetNextTaskIndex(); const auto task_id = @@ -1104,11 +1103,13 @@ Status CoreWorker::SubmitTask(const RayFunction &function, task_options.resources, required_resources, return_ids); TaskSpecification task_spec = builder.Build(); if (options_.is_local_mode) { - return ExecuteTaskLocalMode(task_spec); + ExecuteTaskLocalMode(task_spec); } else { task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), max_retries); - return direct_task_submitter_->SubmitTask(task_spec); + io_service_.post([this, task_spec]() { + RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec)); + }); } } @@ -1139,7 +1140,7 @@ Status CoreWorker::CreateActor(const RayFunction &function, TaskSpecification task_spec = builder.Build(); Status status; if (options_.is_local_mode) { - status = ExecuteTaskLocalMode(task_spec); + ExecuteTaskLocalMode(task_spec); } else { task_manager_->AddPendingTask( GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), @@ -1187,16 +1188,17 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f Status status; TaskSpecification task_spec = builder.Build(); if (options_.is_local_mode) { - return ExecuteTaskLocalMode(task_spec, actor_id); - } - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, - CurrentCallSite()); - if (actor_handle->IsDead()) { - auto status = Status::IOError("sent task to dead actor"); - task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, - &status); + ExecuteTaskLocalMode(task_spec, actor_id); } else { - status = direct_actor_submitter_->SubmitTask(task_spec); + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, + CurrentCallSite()); + if (actor_handle->IsDead()) { + auto status = Status::IOError("sent task to dead actor"); + task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED, + &status); + } else { + status = direct_actor_submitter_->SubmitTask(task_spec); + } } return status; } @@ -1507,8 +1509,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return status; } -Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, - const ActorID &actor_id) { +void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, + const ActorID &actor_id) { auto resource_ids = std::make_shared(); auto return_objects = std::vector>(); auto borrowed_refs = ReferenceCounter::ReferenceTableProto(); @@ -1520,10 +1522,8 @@ Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, } auto old_id = GetActorId(); SetActorId(actor_id); - auto status = ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs); + RAY_UNUSED(ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs)); SetActorId(old_id); - // TODO(ilr): Maybe not necessary - return status; } Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 733f91484..538f6f60e 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -543,10 +543,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// \param[in] args Arguments of this task. /// \param[in] task_options Options for this task. /// \param[out] return_ids Ids of the return objects. - /// \return Status error if task submission fails, likely due to raylet failure. - Status SubmitTask(const RayFunction &function, const std::vector &args, - const TaskOptions &task_options, std::vector *return_ids, - int max_retries); + void SubmitTask(const RayFunction &function, const std::vector &args, + const TaskOptions &task_options, std::vector *return_ids, + int max_retries); /// Create an actor. /// @@ -821,9 +820,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Execute a local mode task (runs normal ExecuteTask) /// /// \param spec[in] task_spec Task specification. - /// \return Status. - Status ExecuteTaskLocalMode(const TaskSpecification &task_spec, - const ActorID &actor_id = ActorID::Nil()); + void ExecuteTaskLocalMode(const TaskSpecification &task_spec, + const ActorID &actor_id = ActorID::Nil()); /// Build arguments for task executor. This would loop through all the arguments /// in task spec, and for each of them that's passed by reference (ObjectID), diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 5c07706e1..bb94d153e 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -131,11 +131,9 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub std::vector return_ids; // TODO (kfstorm): Allow setting `max_retries` via `CallOptions`. - auto status = ray::CoreWorkerProcess::GetCoreWorker().SubmitTask( - ray_function, task_args, task_options, &return_ids, - /*max_retries=*/0); - - THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr); + ray::CoreWorkerProcess::GetCoreWorker().SubmitTask(ray_function, task_args, + task_options, &return_ids, + /*max_retries=*/0); return NativeIdVectorToJavaByteArrayList(env, return_ids); } diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index 7f5369710..d4c167d63 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -376,8 +376,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map &res "MergeInputArgsAsOutput", "", "", "")); TaskOptions options; std::vector return_ids; - RAY_CHECK_OK( - driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0)); + driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0); ASSERT_EQ(return_ids.size(), 1);