Move task to common module and add checks in getter methods (#5147)

This commit is contained in:
Hao Chen
2019-07-11 17:07:04 +08:00
committed by GitHub
parent d8b50a5018
commit fd835d107e
51 changed files with 346 additions and 361 deletions
+2 -2
View File
@@ -100,8 +100,8 @@ cdef extern from "ray/protobuf/common.pb.h" namespace "Language" nogil:
cdef CLanguage LANGUAGE_JAVA "Language::JAVA"
cdef extern from "ray/raylet/scheduling_resources.h" \
namespace "ray::raylet" nogil:
cdef extern from "ray/common/task/scheduling_resources.h" \
namespace "ray" nogil:
cdef cppclass ResourceSet "ResourceSet":
ResourceSet()
ResourceSet(const unordered_map[c_string, double] &resource_map)
+15 -15
View File
@@ -35,8 +35,8 @@ cdef extern from "ray/protobuf/gcs.pb.h" namespace "ray::rpc" nogil:
const c_string &SerializeAsString()
cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskSpec "ray::raylet::TaskSpecification":
cdef extern from "ray/common/task/task_spec.h" namespace "ray" nogil:
cdef cppclass CTaskSpec "ray::TaskSpecification":
CTaskSpec(const RpcTaskSpec message)
CTaskSpec(const c_string &serialized_binary)
const RpcTaskSpec &GetMessage()
@@ -61,7 +61,7 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
const ResourceSet GetRequiredPlacementResources() const
c_bool IsDriverTask() const
CLanguage GetLanguage() const
c_bool IsNormalTask() const
c_bool IsActorCreationTask() const
c_bool IsActorTask() const
CActorID ActorCreationId() const
@@ -74,38 +74,38 @@ cdef extern from "ray/raylet/task_spec.h" namespace "ray::raylet" nogil:
c_vector[CActorHandleID] NewActorHandles() const
cdef extern from "ray/raylet/task_util.h" namespace "ray::raylet" nogil:
cdef cppclass TaskSpecBuilder "ray::raylet::TaskSpecBuilder":
cdef extern from "ray/common/task/task_util.h" namespace "ray" nogil:
cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder":
TaskSpecBuilder &SetCommonTaskSpec(
const CLanguage &language, const c_vector[c_string] &function_descriptor,
const CJobID &job_id, const CTaskID &parent_task_id, uint64_t parent_counter,
uint64_t num_returns, const unordered_map[c_string, double] &required_resources,
const unordered_map[c_string, double] &required_placement_resources);
const unordered_map[c_string, double] &required_placement_resources)
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id);
TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id)
TaskSpecBuilder &AddByValueArg(const c_string &data);
TaskSpecBuilder &AddByValueArg(const c_string &data)
TaskSpecBuilder &SetActorCreationTaskSpec(
const CActorID &actor_id, uint64_t max_reconstructions,
const c_vector[c_string] &dynamic_worker_options);
const c_vector[c_string] &dynamic_worker_options)
TaskSpecBuilder &SetActorTaskSpec(
const CActorID &actor_id, const CActorHandleID &actor_handle_id,
const CObjectID &actor_creation_dummy_object_id, uint64_t actor_counter,
const c_vector[CActorHandleID] &new_handle_ids);
const c_vector[CActorHandleID] &new_handle_ids)
RpcTaskSpec GetMessage();
RpcTaskSpec GetMessage()
cdef extern from "ray/raylet/task_execution_spec.h" namespace "ray::raylet" nogil:
cdef cppclass CTaskExecutionSpec "ray::raylet::TaskExecutionSpecification":
cdef extern from "ray/common/task/task_execution_spec.h" namespace "ray" nogil:
cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification":
CTaskExecutionSpec(RpcTaskExecutionSpec message)
CTaskExecutionSpec(const c_string &serialized_binary)
const RpcTaskExecutionSpec &GetMessage()
c_vector[CObjectID] ExecutionDependencies()
uint64_t NumForwards()
cdef extern from "ray/raylet/task.h" namespace "ray::raylet" nogil:
cdef cppclass CTask "ray::raylet::Task":
cdef extern from "ray/common/task/task.h" namespace "ray" nogil:
cdef cppclass CTask "ray::Task":
CTask(CTaskSpec task_spec, CTaskExecutionSpec task_execution_spec)
+23 -3
View File
@@ -15,7 +15,7 @@ from ray.includes.task cimport (
cdef class TaskSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskSpecification`."""
"""Cython wrapper class of C++ `ray::TaskSpecification`."""
cdef:
unique_ptr[CTaskSpec] task_spec
@@ -121,6 +121,18 @@ cdef class TaskSpec:
"""
return self.task_spec.get().Serialize()
def is_normal_task(self):
"""Whether this task is a normal task."""
return self.task_spec.get().IsNormalTask()
def is_actor_task(self):
"""Whether this task is an actor task."""
return self.task_spec.get().IsActorTask()
def is_actor_creation_task(self):
"""Whether this task is an actor creation task."""
return self.task_spec.get().IsActorCreationTask()
def job_id(self):
"""Return the job ID for this task."""
return JobID(self.task_spec.get().JobId().Binary())
@@ -206,24 +218,32 @@ cdef class TaskSpec:
def actor_creation_id(self):
"""Return the actor creation ID for the task."""
if not self.is_actor_creation_task():
return ActorID.nil()
return ActorID(self.task_spec.get().ActorCreationId().Binary())
def actor_creation_dummy_object_id(self):
"""Return the actor creation dummy object ID for the task."""
if not self.is_actor_task():
return ObjectID.nil()
return ObjectID(
self.task_spec.get().ActorCreationDummyObjectId().Binary())
def actor_id(self):
"""Return the actor ID for this task."""
if not self.is_actor_task():
return ActorID.nil()
return ActorID(self.task_spec.get().ActorId().Binary())
def actor_counter(self):
"""Return the actor counter for this task."""
if not self.is_actor_task():
return 0
return self.task_spec.get().ActorCounter()
cdef class TaskExecutionSpec:
"""Cython wrapper class of C++ `ray::raylet::TaskExecutionSpecification`."""
"""Cython wrapper class of C++ `ray::TaskExecutionSpecification`."""
cdef:
unique_ptr[CTaskExecutionSpec] c_spec
@@ -259,7 +279,7 @@ cdef class TaskExecutionSpec:
cdef class Task:
"""Cython wrapper class of C++ `ray::raylet::Task`."""
"""Cython wrapper class of C++ `ray::Task`."""
cdef:
unique_ptr[CTask] c_task
+8 -11
View File
@@ -868,7 +868,7 @@ class Worker(object):
assert self.current_task_id.is_nil()
assert self.task_context.task_index == 0
assert self.task_context.put_index == 1
if task.actor_id().is_nil():
if not task.is_actor_task():
# If this worker is not an actor, check that `current_job_id`
# was reset when the worker finished the previous task.
assert self.current_job_id.is_nil()
@@ -887,8 +887,7 @@ class Worker(object):
task.function_descriptor_list())
args = task.arguments()
return_object_ids = task.returns()
if (not task.actor_id().is_nil()
or not task.actor_creation_id().is_nil()):
if task.is_actor_task() or task.is_actor_creation_task():
dummy_return_id = return_object_ids.pop()
function_executor = function_execution_info.function
function_name = function_execution_info.function_name
@@ -911,11 +910,10 @@ class Worker(object):
try:
self._current_task = task
with profiling.profile("task:execute"):
if (task.actor_id().is_nil()
and task.actor_creation_id().is_nil()):
if task.is_normal_task():
outputs = function_executor(*arguments)
else:
if not task.actor_id().is_nil():
if task.is_actor_task():
key = task.actor_id()
else:
key = task.actor_creation_id()
@@ -924,7 +922,7 @@ class Worker(object):
except Exception as e:
# Determine whether the exception occured during a task, not an
# actor method.
task_exception = task.actor_id().is_nil()
task_exception = not task.is_actor_task()
traceback_str = ray.utils.format_error_message(
traceback.format_exc(), task_exception=task_exception)
self._handle_process_task_failure(
@@ -980,8 +978,7 @@ class Worker(object):
# TODO(rkn): It would be preferable for actor creation tasks to share
# more of the code path with regular task execution.
if not task.actor_creation_id().is_nil():
assert self.actor_id.is_nil()
if task.is_actor_creation_task():
self.actor_id = task.actor_creation_id()
self.actor_creation_task_id = task.task_id()
actor_class = self.function_actor_manager.load_actor_class(
@@ -999,8 +996,8 @@ class Worker(object):
# Execute the task.
function_name = execution_info.function_name
extra_data = {"name": function_name, "task_id": task.task_id().hex()}
if task.actor_id().is_nil():
if task.actor_creation_id().is_nil():
if not task.is_actor_task():
if not task.is_actor_creation_task():
title = "ray_worker:{}()".format(function_name)
next_title = "ray_worker"
else: