diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 6c02bd069..9fc77d72a 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -836,5 +836,26 @@ def test_actor_creation_latency(ray_start_regular): actor_create_time - start, end - start)) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }], indirect=True) +def test_detached_actor_local_mode(ray_start_regular): + RETURN_VALUE = 3 + + @ray.remote + class Y: + def f(self): + return RETURN_VALUE + + Y.options(name="test").remote() + y = ray.get_actor("test") + assert ray.get(y.f.remote()) == RETURN_VALUE + + ray.kill(y) + with pytest.raises(ValueError): + ray.get_actor("test") + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 57a53bf1b..f6479bea7 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1266,6 +1266,11 @@ Status CoreWorker::CreateActor(const RayFunction &function, TaskSpecification task_spec = builder.Build(); Status status; if (options_.is_local_mode) { + if (task_spec.IsDetachedActor()) { + // Since local mode doesn't pass GCS actor management code path, + // it just register actor names in memory. + local_mode_named_actor_registry_.emplace(actor_creation_options.name, actor_id); + } ExecuteTaskLocalMode(task_spec); } else { int max_retries; @@ -1359,6 +1364,10 @@ Status CoreWorker::CancelTask(const ObjectID &object_id, bool force_kill) { } Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_restart) { + if (options_.is_local_mode) { + return KillActorLocalMode(actor_id); + } + if (!actor_manager_->CheckActorHandleExists(actor_id)) { std::stringstream stream; stream << "Failed to find a corresponding actor handle for " << actor_id; @@ -1368,6 +1377,20 @@ Status CoreWorker::KillActor(const ActorID &actor_id, bool force_kill, bool no_r return Status::OK(); } +// SANG-TODO +Status CoreWorker::KillActorLocalMode(const ActorID &actor_id) { + // KillActor doesn't do anything in local mode. We only remove named actor entry if + // exists. + for (auto it = local_mode_named_actor_registry_.begin(); + it != local_mode_named_actor_registry_.end();) { + auto current = it++; + if (current->second == actor_id) { + local_mode_named_actor_registry_.erase(current); + } + } + return Status::OK(); +} + void CoreWorker::RemoveActorHandleReference(const ActorID &actor_id) { ObjectID actor_handle_id = ObjectID::ForActorHandle(actor_id); reference_counter_->RemoveLocalReference(actor_handle_id, nullptr); @@ -1398,6 +1421,9 @@ std::pair CoreWorker::GetNamedActorHandle( const std::string &name) { RAY_CHECK(RayConfig::instance().gcs_actor_service_enabled()); RAY_CHECK(!name.empty()); + if (options_.is_local_mode) { + return GetNamedActorHandleLocalMode(name); + } // This call needs to be blocking because we can't return until the actor // handle is created, which requires the response from the RPC. This is @@ -1444,6 +1470,18 @@ std::pair CoreWorker::GetNamedActorHandle( return std::make_pair(GetActorHandle(actor_id), Status::OK()); } +std::pair CoreWorker::GetNamedActorHandleLocalMode( + const std::string &name) { + auto it = local_mode_named_actor_registry_.find(name); + if (it == local_mode_named_actor_registry_.end()) { + std::ostringstream stream; + stream << "Failed to look up actor with name '" << name; + return std::make_pair(nullptr, Status::NotFound(stream.str())); + } + + return std::make_pair(GetActorHandle(it->second), Status::OK()); +} + const ResourceMappingType CoreWorker::GetResourceIDs() const { absl::MutexLock lock(&mutex_); return *resource_ids_; diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index b4b7151cf..929064e70 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -885,6 +885,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void ExecuteTaskLocalMode(const TaskSpecification &task_spec, const ActorID &actor_id = ActorID::Nil()); + /// KillActor API for a local mode. + Status KillActorLocalMode(const ActorID &actor_id); + + /// Get a handle to a named actor for local mode. + std::pair GetNamedActorHandleLocalMode( + const std::string &name); + /// Get the values of the task arguments for the executor. Values are /// retrieved from the local plasma store or, if the value is inlined, from /// the task spec. @@ -1101,6 +1108,10 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { // Queue of tasks to resubmit when the specified time passes. std::deque> to_resubmit_ GUARDED_BY(mutex_); + /// Map of named actor registry. It doesn't need to hold a lock because + /// local mode is single-threaded. + absl::flat_hash_map local_mode_named_actor_registry_; + // Guard for `async_plasma_callbacks_` map. mutable absl::Mutex plasma_mutex_;