[Core] Actor handle refactoring (#8895)

* Marking needed changes.

* Resolve basic dependencies.

* In progress.

* linting.

* In progress 2.

* Linting.

* Refactor done. Cleanup needed.

* Linting.

* Recover kill actor in core worker because it is used inside raylet

* Cleanup.

* Use unique pointer instead. Unit tests are broken now.

* Fix the upstream change.

* Addressed code review 1.

* Lint.

* Addressed code review 2.

* Fix weird github history.

* Lint.

* Linting using clang 7.0.

* Use a better check message.

* Revert cpp stuff.

* Fix weird linting errors.

* Manuall fix all lint issues.

* Update a newline.

* Refactor some interface.

* Addressed all code review.

* Addressed code review
This commit is contained in:
SangBin Cho
2020-07-07 11:11:41 -07:00
committed by GitHub
parent f69d8b951e
commit 8f19f1eafb
18 changed files with 822 additions and 258 deletions
+11 -11
View File
@@ -27,16 +27,16 @@ from ray.includes.function_descriptor cimport (
)
cdef extern from *:
"""
#if __OPTIMIZE__ && __OPTIMIZE__ == 1
#undef __OPTIMIZE__
int __OPTIMIZE__ = 1;
#define __OPTIMIZE__ 1
#else
int __OPTIMIZE__ = 0;
#endif
"""
int __OPTIMIZE__
"""
#if __OPTIMIZE__ && __OPTIMIZE__ == 1
#undef __OPTIMIZE__
int __OPTIMIZE__ = 1;
#define __OPTIMIZE__ 1
#else
int __OPTIMIZE__ = 0;
#endif
"""
int __OPTIMIZE__
cdef extern from "Python.h":
# Note(simon): This is used to configure asyncio actor stack size.
@@ -98,7 +98,7 @@ cdef class CoreWorker:
self, worker, outputs, const c_vector[CObjectID] return_ids,
c_vector[shared_ptr[CRayObject]] *returns)
cdef yield_current_fiber(self, CFiberEvent &fiber_event)
cdef make_actor_handle(self, CActorHandle *c_actor_handle)
cdef make_actor_handle(self, const CActorHandle *c_actor_handle)
cdef class FunctionDescriptor:
cdef:
+14 -11
View File
@@ -112,8 +112,8 @@ include "includes/serialization.pxi"
include "includes/libcoreworker.pxi"
include "includes/global_state_accessor.pxi"
# Expose GCC & Clang macro to report whether C++ optimizations were enabled
# during compilation.
# Expose GCC & Clang macro to report
# whether C++ optimizations were enabled during compilation.
OPTIMIZED = __OPTIMIZE__
logger = logging.getLogger(__name__)
@@ -1014,7 +1014,7 @@ cdef class CoreWorker:
CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference(
c_actor_id)
cdef make_actor_handle(self, CActorHandle *c_actor_handle):
cdef make_actor_handle(self, const CActorHandle *c_actor_handle):
worker = ray.worker.global_worker
worker.check_connected()
manager = worker.function_actor_manager
@@ -1058,24 +1058,27 @@ cdef class CoreWorker:
ObjectID
outer_object_id):
cdef:
CActorHandle* c_actor_handle
CObjectID c_outer_object_id = (outer_object_id.native() if
outer_object_id else
CObjectID.Nil())
c_actor_id = (CCoreWorkerProcess.GetCoreWorker()
c_actor_id = (CCoreWorkerProcess
.GetCoreWorker()
.DeserializeAndRegisterActorHandle(
bytes, c_outer_object_id))
check_status(CCoreWorkerProcess.GetCoreWorker().GetActorHandle(
c_actor_id, &c_actor_handle))
cdef:
# NOTE: This handle should not be stored anywhere.
const CActorHandle* c_actor_handle = (
CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id))
return self.make_actor_handle(c_actor_handle)
def get_named_actor_handle(self, const c_string &name):
cdef:
CActorHandle* c_actor_handle
# NOTE: This handle should not be stored anywhere.
const CActorHandle* c_actor_handle = (
CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name))
with nogil:
check_status(CCoreWorkerProcess.GetCoreWorker()
.GetNamedActorHandle(name, &c_actor_handle))
if c_actor_handle == NULL:
raise ValueError("Named Actor Handle Not Found")
return self.make_actor_handle(c_actor_handle)
def serialize_actor_handle(self, ActorID actor_id):
+2 -4
View File
@@ -123,10 +123,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil:
CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string
*bytes,
CObjectID *c_actor_handle_id)
CRayStatus GetActorHandle(const CActorID &actor_id,
CActorHandle **actor_handle) const
CRayStatus GetNamedActorHandle(const c_string &name,
CActorHandle **actor_handle)
const CActorHandle* GetActorHandle(const CActorID &actor_id) const
const CActorHandle* GetNamedActorHandle(const c_string &name)
void AddLocalReference(const CObjectID &object_id)
void RemoveLocalReference(const CObjectID &object_id)
const CAddress &GetRpcAddress() const
+3 -1
View File
@@ -74,7 +74,9 @@ def _register_actor(name, actor_handle):
exists = False
if exists:
raise ValueError("An actor with name={} already exists".format(name))
raise ValueError("An actor with name={} already exists or there "
"was timeout in getting this actor handle."
.format(name))
# Add the actor to Redis if it does not already exist.
_internal_kv_put(actor_name, pickle.dumps(actor_handle), overwrite=True)