[core] Post task submission to IO loop (#8090)

* Post to IO loop

* Unused

* Fix build
This commit is contained in:
Stephanie Wang
2020-04-20 19:13:50 -07:00
committed by GitHub
parent 708dff6d8f
commit eefea4e29c
6 changed files with 33 additions and 38 deletions
+2 -2
View File
@@ -868,9 +868,9 @@ cdef class CoreWorker:
prepare_args(self, language, args, &args_vector)
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker().SubmitTask(
CCoreWorkerProcess.GetCoreWorker().SubmitTask(
ray_function, args_vector, task_options, &return_ids,
max_retries))
max_retries)
return VectorToObjectIDs(return_ids)
+1 -1
View File
@@ -82,7 +82,7 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CRayStatus SubmitTask(
void SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids,
int max_retries)
+21 -21
View File
@@ -1086,10 +1086,9 @@ Status CoreWorker::SetResource(const std::string &resource_name, const double ca
return local_raylet_client_->SetResource(resource_name, capacity, client_id);
}
Status CoreWorker::SubmitTask(const RayFunction &function,
const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries) {
void CoreWorker::SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options,
std::vector<ObjectID> *return_ids, int max_retries) {
TaskSpecBuilder builder;
const int next_task_index = worker_context_.GetNextTaskIndex();
const auto task_id =
@@ -1104,11 +1103,13 @@ Status CoreWorker::SubmitTask(const RayFunction &function,
task_options.resources, required_resources, return_ids);
TaskSpecification task_spec = builder.Build();
if (options_.is_local_mode) {
return ExecuteTaskLocalMode(task_spec);
ExecuteTaskLocalMode(task_spec);
} else {
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite(), max_retries);
return direct_task_submitter_->SubmitTask(task_spec);
io_service_.post([this, task_spec]() {
RAY_UNUSED(direct_task_submitter_->SubmitTask(task_spec));
});
}
}
@@ -1139,7 +1140,7 @@ Status CoreWorker::CreateActor(const RayFunction &function,
TaskSpecification task_spec = builder.Build();
Status status;
if (options_.is_local_mode) {
status = ExecuteTaskLocalMode(task_spec);
ExecuteTaskLocalMode(task_spec);
} else {
task_manager_->AddPendingTask(
GetCallerId(), rpc_address_, task_spec, CurrentCallSite(),
@@ -1187,16 +1188,17 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f
Status status;
TaskSpecification task_spec = builder.Build();
if (options_.is_local_mode) {
return ExecuteTaskLocalMode(task_spec, actor_id);
}
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite());
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
ExecuteTaskLocalMode(task_spec, actor_id);
} else {
status = direct_actor_submitter_->SubmitTask(task_spec);
task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec,
CurrentCallSite());
if (actor_handle->IsDead()) {
auto status = Status::IOError("sent task to dead actor");
task_manager_->PendingTaskFailed(task_spec.TaskId(), rpc::ErrorType::ACTOR_DIED,
&status);
} else {
status = direct_actor_submitter_->SubmitTask(task_spec);
}
}
return status;
}
@@ -1507,8 +1509,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec,
return status;
}
Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id) {
void CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id) {
auto resource_ids = std::make_shared<ResourceMappingType>();
auto return_objects = std::vector<std::shared_ptr<RayObject>>();
auto borrowed_refs = ReferenceCounter::ReferenceTableProto();
@@ -1520,10 +1522,8 @@ Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec,
}
auto old_id = GetActorId();
SetActorId(actor_id);
auto status = ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs);
RAY_UNUSED(ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs));
SetActorId(old_id);
// TODO(ilr): Maybe not necessary
return status;
}
Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task,
+5 -7
View File
@@ -543,10 +543,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// \param[in] args Arguments of this task.
/// \param[in] task_options Options for this task.
/// \param[out] return_ids Ids of the return objects.
/// \return Status error if task submission fails, likely due to raylet failure.
Status SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids,
int max_retries);
void SubmitTask(const RayFunction &function, const std::vector<TaskArg> &args,
const TaskOptions &task_options, std::vector<ObjectID> *return_ids,
int max_retries);
/// Create an actor.
///
@@ -821,9 +820,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler {
/// Execute a local mode task (runs normal ExecuteTask)
///
/// \param spec[in] task_spec Task specification.
/// \return Status.
Status ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id = ActorID::Nil());
void ExecuteTaskLocalMode(const TaskSpecification &task_spec,
const ActorID &actor_id = ActorID::Nil());
/// Build arguments for task executor. This would loop through all the arguments
/// in task spec, and for each of them that's passed by reference (ObjectID),
@@ -131,11 +131,9 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_task_NativeTaskSubmitter_nativeSub
std::vector<ObjectID> return_ids;
// TODO (kfstorm): Allow setting `max_retries` via `CallOptions`.
auto status = ray::CoreWorkerProcess::GetCoreWorker().SubmitTask(
ray_function, task_args, task_options, &return_ids,
/*max_retries=*/0);
THROW_EXCEPTION_AND_RETURN_IF_NOT_OK(env, status, nullptr);
ray::CoreWorkerProcess::GetCoreWorker().SubmitTask(ray_function, task_args,
task_options, &return_ids,
/*max_retries=*/0);
return NativeIdVectorToJavaByteArrayList(env, return_ids);
}
+1 -2
View File
@@ -376,8 +376,7 @@ void CoreWorkerTest::TestNormalTask(std::unordered_map<std::string, double> &res
"MergeInputArgsAsOutput", "", "", ""));
TaskOptions options;
std::vector<ObjectID> return_ids;
RAY_CHECK_OK(
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0));
driver.SubmitTask(func, args, options, &return_ids, /*max_retries=*/0);
ASSERT_EQ(return_ids.size(), 1);