diff --git a/python/ray/tests/test_actor_advanced.py b/python/ray/tests/test_actor_advanced.py index cdffe6afa..b599ec5c1 100644 --- a/python/ray/tests/test_actor_advanced.py +++ b/python/ray/tests/test_actor_advanced.py @@ -693,11 +693,24 @@ ray.init(address="{}") existing_actor = ray.get_actor("{}") assert ray.get(existing_actor.ping.remote()) == "pong" +@ray.remote +def foo(): + return "bar" + +@ray.remote +class NonDetachedActor: + def foo(self): + return "bar" + @ray.remote class DetachedActor: def ping(self): return "pong" + def foobar(self): + actor = NonDetachedActor.remote() + return ray.get([foo.remote(), actor.foo.remote()]) + actor = DetachedActor._remote(lifetime="detached", name="{}") ray.get(actor.ping.remote()) """.format(redis_address, get_actor_name, create_actor_name) @@ -705,6 +718,9 @@ ray.get(actor.ping.remote()) run_string_as_driver(driver_script) detached_actor = ray.get_actor(create_actor_name) assert ray.get(detached_actor.ping.remote()) == "pong" + # Verify that a detached actor is able to create tasks/actors + # even if the driver of the detached actor has exited. + assert ray.get(detached_actor.foobar.remote()) == ["bar", "bar"] def test_detached_actor_cleanup(ray_start_regular): diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 2d32e79da..ab092cd7d 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -181,8 +181,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language, if (RayConfig::instance().enable_multi_tenancy() && worker_type != rpc::WorkerType::IO_WORKER) { RAY_CHECK(!job_id.IsNil()); - auto it = unfinished_jobs_.find(job_id); - if (it == unfinished_jobs_.end()) { + auto it = all_jobs_.find(job_id); + if (it == all_jobs_.end()) { RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet."; // Will reschedule ready tasks in `NodeManager::HandleJobStarted`. return Process(); @@ -433,11 +433,13 @@ void WorkerPool::MarkPortAsFree(int port) { } void WorkerPool::HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config) { - unfinished_jobs_[job_id] = job_config; + all_jobs_[job_id] = job_config; } void WorkerPool::HandleJobFinished(const JobID &job_id) { - unfinished_jobs_.erase(job_id); + // Currently we don't erase the job from `all_jobs_` , as a workaround for + // https://github.com/ray-project/ray/issues/11437. + // unfinished_jobs_.erase(job_id); } Status WorkerPool::RegisterWorker(const std::shared_ptr &worker, @@ -477,12 +479,12 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr &worker RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end()); auto job_id = dedicated_workers_it->second; - // If the job is finished, we don't allow new registrations. - if (!unfinished_jobs_.contains(job_id)) { + // If the job is unknown to Raylet, we don't allow new registrations. + if (!all_jobs_.contains(job_id)) { auto process = Process::FromPid(pid); state.starting_worker_processes.erase(process); Status status = - Status::Invalid("The job is not running anymore. Reject registration."); + Status::Invalid("The provided job ID is unknown. Reject registration."); send_reply_callback(status, /*port=*/0); return status; } @@ -547,7 +549,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr &driver auto &state = GetStateForLanguage(driver->GetLanguage()); state.registered_drivers.insert(std::move(driver)); driver->AssignJobId(job_id); - unfinished_jobs_[job_id] = job_config; + all_jobs_[job_id] = job_config; // This is a workaround to start initial workers on this node if and only if Raylet is // started by a Python driver and the job config is not set in `ray.init(...)`. diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 981493a2d..5fbfce630 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -414,7 +414,7 @@ class WorkerPool : public WorkerPoolInterface { int num_initial_python_workers_for_first_job_; /// This map tracks the latest infos of unfinished jobs. - absl::flat_hash_map unfinished_jobs_; + absl::flat_hash_map all_jobs_; /// The pool of idle non-actor workers of all languages. This is used to kill idle /// workers in FIFO order. The second element of std::pair is the time a worker becomes