Refactor CoreWorker to remove TaskInterface (#5924)

* Remove TaskInterface

* Remove Status return value

* Remove CActorHandle, some return values, TaskSubmitter

* lint

* doc

* doc

* fix build

* lint

* Return Status, guarded by annotation, fail tasks for RECONSTRUCTING actors

* fix

* move annotation

* revert

* Fix core worker test

* nits
This commit is contained in:
Stephanie Wang
2019-10-18 00:03:57 -04:00
committed by GitHub
parent 3ac8592dcf
commit 697f765efc
17 changed files with 343 additions and 479 deletions
+13 -28
View File
@@ -23,7 +23,6 @@ from libcpp.vector cimport vector as c_vector
from cython.operator import dereference, postincrement
from ray.includes.common cimport (
CActorHandle,
CLanguage,
CRayObject,
CRayStatus,
@@ -630,7 +629,6 @@ cdef class CoreWorker:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
CTaskID caller_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@@ -638,11 +636,9 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().SubmitTask(
caller_id,
check_status(self.core_worker.get().SubmitTask(
ray_function, args_vector, task_options, &return_ids))
return VectorToObjectIDs(return_ids)
@@ -654,13 +650,12 @@ cdef class CoreWorker:
resources,
placement_resources):
cdef:
unique_ptr[CActorHandle] actor_handle
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[c_string] dynamic_worker_options
unordered_map[c_string, double] c_resources
unordered_map[c_string, double] c_placement_resources
CTaskID caller_id
CActorID c_actor_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@@ -668,22 +663,16 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().CreateActor(
caller_id,
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, False, c_resources,
c_placement_resources, dynamic_worker_options),
&actor_handle))
&c_actor_id))
actor_id = ActorID(actor_handle.get().GetActorID().Binary())
inserted = self.core_worker.get().AddActorHandle(
move(actor_handle))
assert inserted, "Actor {} already exists".format(actor_id)
return actor_id
return ActorID(c_actor_id.Binary())
def submit_actor_task(self,
ActorID actor_id,
@@ -699,7 +688,6 @@ cdef class CoreWorker:
CRayFunction ray_function
c_vector[CTaskArg] args_vector
c_vector[CObjectID] return_ids
CTaskID caller_id
with profiling.profile("submit_task"):
prepare_resources(resources, &c_resources)
@@ -707,12 +695,10 @@ cdef class CoreWorker:
ray_function = CRayFunction(
LANGUAGE_PYTHON, string_vector_from_list(function_descriptor))
prepare_args(args, &args_vector)
caller_id = self.core_worker.get().GetCallerId()
with nogil:
check_status(self.core_worker.get().Tasks().SubmitActorTask(
caller_id,
self.core_worker.get().GetActorHandle(c_actor_id),
check_status(self.core_worker.get().SubmitActorTask(
c_actor_id,
ray_function,
args_vector, task_options, &return_ids))
@@ -726,17 +712,16 @@ cdef class CoreWorker:
self.core_worker.get().CreateProfileEvent(c_event_type),
extra_data)
def deserialize_actor_handle(self, c_string bytes):
cdef:
unique_ptr[CActorHandle] actor_handle
actor_handle.reset(new CActorHandle(bytes))
actor_id = ActorID(actor_handle.get().GetActorID().Binary())
self.core_worker.get().AddActorHandle(move(actor_handle))
def deserialize_and_register_actor_handle(self, const c_string &bytes):
c_actor_id = self.core_worker.get().DeserializeAndRegisterActorHandle(
bytes)
actor_id = ActorID(c_actor_id.Binary())
return actor_id
def serialize_actor_handle(self, ActorID actor_id):
cdef:
CActorID c_actor_id = actor_id.native()
c_string output
self.core_worker.get().GetActorHandle(c_actor_id).Serialize(&output)
check_status(self.core_worker.get().SerializeActorHandle(
c_actor_id, &output))
return output
+2 -1
View File
@@ -631,7 +631,8 @@ class ActorHandle(object):
# TODO(swang): Accessing the worker's current task ID is not
# thread-safe.
# Local mode just uses the actor ID.
worker.core_worker.deserialize_actor_handle(state["core_handle"])
worker.core_worker.deserialize_and_register_actor_handle(
state["core_handle"])
if hasattr(worker, "core_worker") else state["core_handle"],
state["module_name"],
state["class_name"],
-7
View File
@@ -183,7 +183,6 @@ cdef extern from "ray/core_worker/common.h" nogil:
@staticmethod
CTaskArg PassByValue(const shared_ptr[CRayObject] &data)
cdef extern from "ray/core_worker/task_interface.h" nogil:
cdef cppclass CTaskOptions "ray::TaskOptions":
CTaskOptions()
CTaskOptions(int num_returns,
@@ -197,12 +196,6 @@ cdef extern from "ray/core_worker/task_interface.h" nogil:
const unordered_map[c_string, double] &placement_resources,
const c_vector[c_string] &dynamic_worker_options)
cdef cppclass CActorHandle "ray::ActorHandle":
CActorHandle(const c_string &serialized)
CActorID GetActorID() const
void Serialize(c_string *output)
cdef extern from "ray/gcs/gcs_client_interface.h" nogil:
cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions":
CGcsClientOptions(const c_string &ip, int port,
+15 -21
View File
@@ -12,7 +12,6 @@ from ray.includes.unique_ids cimport (
)
from ray.includes.common cimport (
CActorCreationOptions,
CActorHandle,
CBuffer,
CRayFunction,
CRayObject,
@@ -30,23 +29,6 @@ cdef extern from "ray/core_worker/profiling.h" nogil:
cdef cppclass CProfileEvent "ray::worker::ProfileEvent":
void SetExtraData(const c_string &extra_data)
cdef extern from "ray/core_worker/task_interface.h" namespace "ray" nogil:
cdef cppclass CTaskSubmissionInterface "CoreWorkerTaskInterface":
CRayStatus SubmitTask(
const CTaskID &caller_id,
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids)
CRayStatus CreateActor(
const CTaskID &caller_id,
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options,
unique_ptr[CActorHandle] *handle)
CRayStatus SubmitActorTask(
const CTaskID &caller_id,
CActorHandle &handle, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
cdef extern from "ray/core_worker/object_interface.h" nogil:
cdef cppclass CObjectInterface "ray::CoreWorkerObjectInterface":
CRayStatus SetClientOptions(c_string client_name, int64_t limit)
@@ -78,7 +60,18 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CWorkerType &GetWorkerType()
CLanguage &GetLanguage()
CObjectInterface &Objects()
CTaskSubmissionInterface &Tasks()
CRayStatus SubmitTask(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CTaskOptions &options, c_vector[CObjectID] *return_ids)
CRayStatus CreateActor(
const CRayFunction &function, const c_vector[CTaskArg] &args,
const CActorCreationOptions &options, CActorID *actor_id)
CRayStatus SubmitActorTask(
const CActorID &actor_id, const CRayFunction &function,
const c_vector[CTaskArg] &args, const CTaskOptions &options,
c_vector[CObjectID] *return_ids)
# CTaskExecutionInterface &Execution()
unique_ptr[CProfileEvent] CreateProfileEvent(
const c_string &event_type)
@@ -94,5 +87,6 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
void SetActorId(const CActorID &actor_id)
const CActorID &GetActorId()
CTaskID GetCallerId()
c_bool AddActorHandle(unique_ptr[CActorHandle] handle)
CActorHandle &GetActorHandle(const CActorID &actor_id)
CActorID DeserializeAndRegisterActorHandle(const c_string &bytes)
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
*bytes)