diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 4c8ae1c3f..e8edc78a7 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -17,7 +17,12 @@ from ray.includes.common cimport ( CBuffer, CRayObject ) -from ray.includes.libcoreworker cimport CActorHandle, CFiberEvent +from ray.includes.libcoreworker cimport ( + ActorHandleSharedPtr, + CActorHandle, + CFiberEvent, +) + from ray.includes.unique_ids cimport ( CObjectID, CActorID @@ -101,7 +106,7 @@ cdef class CoreWorker: self, worker, outputs, const c_vector[CObjectID] return_ids, c_vector[shared_ptr[CRayObject]] *returns) cdef yield_current_fiber(self, CFiberEvent &fiber_event) - cdef make_actor_handle(self, const CActorHandle *c_actor_handle) + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle) cdef class FunctionDescriptor: cdef: diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 1360c96ce..0d4f5b97b 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -79,6 +79,7 @@ from ray.includes.unique_ids cimport ( CPlacementGroupID, ) from ray.includes.libcoreworker cimport ( + ActorHandleSharedPtr, CActorCreationOptions, CPlacementGroupCreationOptions, CCoreWorkerOptions, @@ -1321,22 +1322,22 @@ cdef class CoreWorker: CCoreWorkerProcess.GetCoreWorker().RemoveActorHandleReference( c_actor_id) - cdef make_actor_handle(self, const CActorHandle *c_actor_handle): + cdef make_actor_handle(self, ActorHandleSharedPtr c_actor_handle): worker = ray.worker.global_worker worker.check_connected() manager = worker.function_actor_manager - actor_id = ActorID(c_actor_handle.GetActorID().Binary()) - job_id = JobID(c_actor_handle.CreationJobID().Binary()) - language = Language.from_native(c_actor_handle.ActorLanguage()) - actor_creation_function_descriptor = \ - CFunctionDescriptorToPython( - c_actor_handle.ActorCreationTaskFunctionDescriptor()) + actor_id = ActorID(dereference(c_actor_handle).GetActorID().Binary()) + job_id = JobID(dereference(c_actor_handle).CreationJobID().Binary()) + language = Language.from_native( + dereference(c_actor_handle).ActorLanguage()) + actor_creation_function_descriptor = CFunctionDescriptorToPython( + dereference(c_actor_handle).ActorCreationTaskFunctionDescriptor()) if language == Language.PYTHON: assert isinstance(actor_creation_function_descriptor, PythonFunctionDescriptor) # Load actor_method_cpu from actor handle's extension data. - extension_data = c_actor_handle.ExtensionData() + extension_data = dereference(c_actor_handle).ExtensionData() if extension_data: actor_method_cpu = int(extension_data) else: @@ -1372,27 +1373,21 @@ cdef class CoreWorker: .GetCoreWorker() .DeserializeAndRegisterActorHandle( bytes, c_outer_object_id)) - cdef: - # NOTE: This handle should not be stored anywhere. - const CActorHandle* c_actor_handle = ( - CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) - return self.make_actor_handle(c_actor_handle) + return self.make_actor_handle( + CCoreWorkerProcess.GetCoreWorker().GetActorHandle(c_actor_id)) def get_named_actor_handle(self, const c_string &name): cdef: - pair[const CActorHandle*, CRayStatus] named_actor_handle_pair - # NOTE: This handle should not be stored anywhere. - const CActorHandle* c_actor_handle + pair[ActorHandleSharedPtr, CRayStatus] named_actor_handle_pair # We need it because GetNamedActorHandle needs # to call a method that holds the gil. with nogil: named_actor_handle_pair = ( CCoreWorkerProcess.GetCoreWorker().GetNamedActorHandle(name)) - c_actor_handle = named_actor_handle_pair.first check_status(named_actor_handle_pair.second) - return self.make_actor_handle(c_actor_handle) + return self.make_actor_handle(named_actor_handle_pair.first) def serialize_actor_handle(self, ActorID actor_id): cdef: diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 9dd63aafe..849cc70b2 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -49,6 +49,11 @@ ctypedef void (*ray_callback_function) \ ctypedef void (*plasma_callback_function) \ (CObjectID object_id, int64_t data_size, int64_t metadata_size) +# NOTE: This ctypedef is needed, because Cython doesn't compile +# "pair[shared_ptr[const CActorHandle], CRayStatus]". +# This is a bug of cython: https://github.com/cython/cython/issues/3967. +ctypedef shared_ptr[const CActorHandle] ActorHandleSharedPtr + cdef extern from "ray/core_worker/profiling.h" nogil: cdef cppclass CProfiler "ray::worker::Profiler": void Start() @@ -140,8 +145,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus SerializeActorHandle(const CActorID &actor_id, c_string *bytes, CObjectID *c_actor_handle_id) - const CActorHandle* GetActorHandle(const CActorID &actor_id) const - pair[const CActorHandle*, CRayStatus] GetNamedActorHandle( + ActorHandleSharedPtr GetActorHandle(const CActorID &actor_id) const + pair[ActorHandleSharedPtr, CRayStatus] GetNamedActorHandle( const c_string &name) void AddLocalReference(const CObjectID &object_id) void RemoveLocalReference(const CObjectID &object_id) diff --git a/src/ray/core_worker/actor_manager.cc b/src/ray/core_worker/actor_manager.cc index 73ca9ec34..a9a24c3df 100644 --- a/src/ray/core_worker/actor_manager.cc +++ b/src/ray/core_worker/actor_manager.cc @@ -35,8 +35,7 @@ ActorID ActorManager::RegisterActorHandle(std::unique_ptr actor_han return actor_id; } -const std::unique_ptr &ActorManager::GetActorHandle( - const ActorID &actor_id) { +std::shared_ptr ActorManager::GetActorHandle(const ActorID &actor_id) { absl::MutexLock lock(&mutex_); auto it = actor_handles_.find(actor_id); RAY_CHECK(it != actor_handles_.end()) diff --git a/src/ray/core_worker/actor_manager.h b/src/ray/core_worker/actor_manager.h index ff47b7403..883877df8 100644 --- a/src/ray/core_worker/actor_manager.h +++ b/src/ray/core_worker/actor_manager.h @@ -109,7 +109,7 @@ class ActorManager { /// \param[in] actor_id The actor handle to get. /// \return reference to the actor_handle's pointer. /// NOTE: Returned actorHandle should not be stored anywhere. - const std::unique_ptr &GetActorHandle(const ActorID &actor_id); + std::shared_ptr GetActorHandle(const ActorID &actor_id); /// Check if an actor handle that corresponds to an actor_id exists. /// \param[in] actor_id The actor id of a handle. @@ -193,7 +193,7 @@ class ActorManager { /// Map from actor ID to a handle to that actor. /// Actor handle is a logical abstraction that holds actor handle's states. - absl::flat_hash_map> actor_handles_ + absl::flat_hash_map> actor_handles_ GUARDED_BY(mutex_); }; diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index da44b1453..3cd8e9825 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -440,7 +440,7 @@ CoreWorker::CoreWorker(const CoreWorkerOptions &options, const WorkerID &worker_ RAY_LOG(ERROR) << "Resubmitting task that produced lost plasma object: " << spec.DebugString(); if (spec.IsActorTask()) { - const auto &actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); + auto actor_handle = actor_manager_->GetActorHandle(spec.ActorId()); actor_handle->SetResubmittedActorTaskSpec(spec, spec.ActorDummyObject()); RAY_CHECK_OK(direct_actor_submitter_->SubmitTask(spec)); } else { @@ -1500,8 +1500,7 @@ void CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &fun const std::vector> &args, const TaskOptions &task_options, std::vector *return_ids) { - const std::unique_ptr &actor_handle = - actor_manager_->GetActorHandle(actor_id); + auto actor_handle = actor_manager_->GetActorHandle(actor_id); // Add one for actor cursor object id for tasks. const int num_returns = task_options.num_returns + 1; @@ -1628,18 +1627,18 @@ ActorID CoreWorker::DeserializeAndRegisterActorHandle(const std::string &seriali Status CoreWorker::SerializeActorHandle(const ActorID &actor_id, std::string *output, ObjectID *actor_handle_id) const { - const std::unique_ptr &actor_handle = - actor_manager_->GetActorHandle(actor_id); + auto actor_handle = actor_manager_->GetActorHandle(actor_id); actor_handle->Serialize(output); *actor_handle_id = ObjectID::ForActorHandle(actor_id); return Status::OK(); } -const ActorHandle *CoreWorker::GetActorHandle(const ActorID &actor_id) const { - return actor_manager_->GetActorHandle(actor_id).get(); +std::shared_ptr CoreWorker::GetActorHandle( + const ActorID &actor_id) const { + return actor_manager_->GetActorHandle(actor_id); } -std::pair CoreWorker::GetNamedActorHandle( +std::pair, Status> CoreWorker::GetNamedActorHandle( const std::string &name) { RAY_CHECK(!name.empty()); if (options_.is_local_mode) { @@ -1693,8 +1692,8 @@ std::pair CoreWorker::GetNamedActorHandle( return std::make_pair(GetActorHandle(actor_id), Status::OK()); } -std::pair CoreWorker::GetNamedActorHandleLocalMode( - const std::string &name) { +std::pair, Status> +CoreWorker::GetNamedActorHandleLocalMode(const std::string &name) { auto it = local_mode_named_actor_registry_.find(name); if (it == local_mode_named_actor_registry_.end()) { std::ostringstream stream; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index 1358af3de..baa8561f7 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -786,7 +786,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] actor_id The actor handle to get. /// \return Status::Invalid if we don't have this actor handle. - const ActorHandle *GetActorHandle(const ActorID &actor_id) const; + std::shared_ptr GetActorHandle(const ActorID &actor_id) const; /// Get a handle to a named actor. /// @@ -794,9 +794,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// /// \param[in] name The name of the actor whose handle to get. /// \param[out] actor_handle A handle to the requested actor. - /// \return The raw pointer to the actor handle if found, nullptr otherwise. + /// \return The shared_ptr to the actor handle if found, nullptr otherwise. /// The second pair contains the status of getting a named actor handle. - std::pair GetNamedActorHandle(const std::string &name); + std::pair, Status> GetNamedActorHandle( + const std::string &name); /// /// The following methods are handlers for the core worker's gRPC server, which follow @@ -991,7 +992,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { Status KillActorLocalMode(const ActorID &actor_id); /// Get a handle to a named actor for local mode. - std::pair GetNamedActorHandleLocalMode( + std::pair, Status> GetNamedActorHandleLocalMode( const std::string &name); /// Get the values of the task arguments for the executor. Values are diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc index b7fb72310..4823f73a0 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_RayNativeRuntime.cc @@ -280,7 +280,7 @@ Java_io_ray_runtime_RayNativeRuntime_nativeGetActorIdOfNamedActor(JNIEnv *env, j const char *native_actor_name = env->GetStringUTFChars(actor_name, JNI_FALSE); auto full_name = GetActorFullName(global, native_actor_name); - const auto *actor_handle = + const auto actor_handle = ray::CoreWorkerProcess::GetCoreWorker().GetNamedActorHandle(full_name).first; ray::ActorID actor_id; if (actor_handle) { diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc index 191df0b0b..76b98f796 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_actor_NativeActorHandle.cc @@ -29,7 +29,7 @@ extern "C" { JNIEXPORT jint JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeGetLanguage( JNIEnv *env, jclass o, jbyteArray actorId) { auto actor_id = JavaByteArrayToId(env, actorId); - const ray::ActorHandle *native_actor_handle = + const auto native_actor_handle = ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id); return native_actor_handle->ActorLanguage(); } @@ -38,7 +38,7 @@ JNIEXPORT jobject JNICALL Java_io_ray_runtime_actor_NativeActorHandle_nativeGetActorCreationTaskFunctionDescriptor( JNIEnv *env, jclass o, jbyteArray actorId) { auto actor_id = JavaByteArrayToId(env, actorId); - const ray::ActorHandle *native_actor_handle = + const auto native_actor_handle = ray::CoreWorkerProcess::GetCoreWorker().GetActorHandle(actor_id); auto function_descriptor = native_actor_handle->ActorCreationTaskFunctionDescriptor(); return NativeRayFunctionDescriptorToJavaStringList(env, function_descriptor); diff --git a/src/ray/core_worker/test/actor_manager_test.cc b/src/ray/core_worker/test/actor_manager_test.cc index cd4a21408..d6c8edd71 100644 --- a/src/ray/core_worker/test/actor_manager_test.cc +++ b/src/ray/core_worker/test/actor_manager_test.cc @@ -183,7 +183,7 @@ TEST_F(ActorManagerTest, TestAddAndGetActorHandleEndToEnd) { ASSERT_FALSE(actor_manager_->AddNewActorHandle(move(actor_handle2), task_id, call_site, caller_address, false)); // Make sure we can get an actor handle correctly. - const std::unique_ptr &actor_handle_to_get = + const std::shared_ptr actor_handle_to_get = actor_manager_->GetActorHandle(actor_id); ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id); @@ -230,7 +230,7 @@ TEST_F(ActorManagerTest, RegisterActorHandles) { std::move(actor_handle), outer_object_id, task_id, call_site, caller_address); ASSERT_TRUE(returned_actor_id == actor_id); // Let's try to get the handle and make sure it works. - const std::unique_ptr &actor_handle_to_get = + const std::shared_ptr actor_handle_to_get = actor_manager_->GetActorHandle(actor_id); ASSERT_TRUE(actor_handle_to_get->GetActorID() == actor_id); ASSERT_TRUE(actor_handle_to_get->CreationJobID() == job_id);