mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 20:38:19 +08:00
[Core] Allow creating tasks/actors in a detached actor when driver has exited (#11493)
* Allow creating tasks/actors in a detached actor when driver has exited * lint * Address comment
This commit is contained in:
@@ -693,11 +693,24 @@ ray.init(address="{}")
|
||||
existing_actor = ray.get_actor("{}")
|
||||
assert ray.get(existing_actor.ping.remote()) == "pong"
|
||||
|
||||
@ray.remote
|
||||
def foo():
|
||||
return "bar"
|
||||
|
||||
@ray.remote
|
||||
class NonDetachedActor:
|
||||
def foo(self):
|
||||
return "bar"
|
||||
|
||||
@ray.remote
|
||||
class DetachedActor:
|
||||
def ping(self):
|
||||
return "pong"
|
||||
|
||||
def foobar(self):
|
||||
actor = NonDetachedActor.remote()
|
||||
return ray.get([foo.remote(), actor.foo.remote()])
|
||||
|
||||
actor = DetachedActor._remote(lifetime="detached", name="{}")
|
||||
ray.get(actor.ping.remote())
|
||||
""".format(redis_address, get_actor_name, create_actor_name)
|
||||
@@ -705,6 +718,9 @@ ray.get(actor.ping.remote())
|
||||
run_string_as_driver(driver_script)
|
||||
detached_actor = ray.get_actor(create_actor_name)
|
||||
assert ray.get(detached_actor.ping.remote()) == "pong"
|
||||
# Verify that a detached actor is able to create tasks/actors
|
||||
# even if the driver of the detached actor has exited.
|
||||
assert ray.get(detached_actor.foobar.remote()) == ["bar", "bar"]
|
||||
|
||||
|
||||
def test_detached_actor_cleanup(ray_start_regular):
|
||||
|
||||
@@ -181,8 +181,8 @@ Process WorkerPool::StartWorkerProcess(const Language &language,
|
||||
if (RayConfig::instance().enable_multi_tenancy() &&
|
||||
worker_type != rpc::WorkerType::IO_WORKER) {
|
||||
RAY_CHECK(!job_id.IsNil());
|
||||
auto it = unfinished_jobs_.find(job_id);
|
||||
if (it == unfinished_jobs_.end()) {
|
||||
auto it = all_jobs_.find(job_id);
|
||||
if (it == all_jobs_.end()) {
|
||||
RAY_LOG(DEBUG) << "Job config of job " << job_id << " are not local yet.";
|
||||
// Will reschedule ready tasks in `NodeManager::HandleJobStarted`.
|
||||
return Process();
|
||||
@@ -433,11 +433,13 @@ void WorkerPool::MarkPortAsFree(int port) {
|
||||
}
|
||||
|
||||
void WorkerPool::HandleJobStarted(const JobID &job_id, const rpc::JobConfig &job_config) {
|
||||
unfinished_jobs_[job_id] = job_config;
|
||||
all_jobs_[job_id] = job_config;
|
||||
}
|
||||
|
||||
void WorkerPool::HandleJobFinished(const JobID &job_id) {
|
||||
unfinished_jobs_.erase(job_id);
|
||||
// Currently we don't erase the job from `all_jobs_` , as a workaround for
|
||||
// https://github.com/ray-project/ray/issues/11437.
|
||||
// unfinished_jobs_.erase(job_id);
|
||||
}
|
||||
|
||||
Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker,
|
||||
@@ -477,12 +479,12 @@ Status WorkerPool::RegisterWorker(const std::shared_ptr<WorkerInterface> &worker
|
||||
RAY_CHECK(dedicated_workers_it != state.worker_pids_to_assigned_jobs.end());
|
||||
auto job_id = dedicated_workers_it->second;
|
||||
|
||||
// If the job is finished, we don't allow new registrations.
|
||||
if (!unfinished_jobs_.contains(job_id)) {
|
||||
// If the job is unknown to Raylet, we don't allow new registrations.
|
||||
if (!all_jobs_.contains(job_id)) {
|
||||
auto process = Process::FromPid(pid);
|
||||
state.starting_worker_processes.erase(process);
|
||||
Status status =
|
||||
Status::Invalid("The job is not running anymore. Reject registration.");
|
||||
Status::Invalid("The provided job ID is unknown. Reject registration.");
|
||||
send_reply_callback(status, /*port=*/0);
|
||||
return status;
|
||||
}
|
||||
@@ -547,7 +549,7 @@ Status WorkerPool::RegisterDriver(const std::shared_ptr<WorkerInterface> &driver
|
||||
auto &state = GetStateForLanguage(driver->GetLanguage());
|
||||
state.registered_drivers.insert(std::move(driver));
|
||||
driver->AssignJobId(job_id);
|
||||
unfinished_jobs_[job_id] = job_config;
|
||||
all_jobs_[job_id] = job_config;
|
||||
|
||||
// This is a workaround to start initial workers on this node if and only if Raylet is
|
||||
// started by a Python driver and the job config is not set in `ray.init(...)`.
|
||||
|
||||
@@ -414,7 +414,7 @@ class WorkerPool : public WorkerPoolInterface {
|
||||
int num_initial_python_workers_for_first_job_;
|
||||
|
||||
/// This map tracks the latest infos of unfinished jobs.
|
||||
absl::flat_hash_map<JobID, rpc::JobConfig> unfinished_jobs_;
|
||||
absl::flat_hash_map<JobID, rpc::JobConfig> all_jobs_;
|
||||
|
||||
/// The pool of idle non-actor workers of all languages. This is used to kill idle
|
||||
/// workers in FIFO order. The second element of std::pair is the time a worker becomes
|
||||
|
||||
Reference in New Issue
Block a user