diff --git a/doc/source/placement-group.rst b/doc/source/placement-group.rst index 6fe8bc3a8..1424b850c 100644 --- a/doc/source/placement-group.rst +++ b/doc/source/placement-group.rst @@ -252,6 +252,42 @@ Note that you can anytime remove the placement group to clean up resources. ray.shutdown() +Placement Group Lifetimes +------------------------- + +.. tabs:: + .. group-tab:: Python + + By default, the lifetimes of placement groups are not detached and will be destroyed + when the driver is terminated (but, if it is created from a detached actor, it is + killed when the detached actor is killed). If you'd like to keep the placement group + alive regardless of its job or detached actor, you should specify + `lifetime="detached"`. For example: + + .. code-block:: python + + # first_driver.py + pg = placement_group([{"CPU": 2}, {"CPU": 2}], strategy="STRICT_SPREAD", lifetime="detached") + ray.get(pg.ready()) + + The placement group's lifetime will be independent of the driver now. This means it + is possible to retrieve the placement group from other drivers regardless of when + the current driver exits. Let's see an example: + + .. code-block:: python + + # second_driver.py + table = ray.util.placement_group_table() + print(len(table)) + + Note that the lifetime option is decoupled from the name. If we only specified + the name without specifying ``lifetime="detached"``, then the placement group can + only be retrieved as long as the original driver is still running. + + .. group-tab:: Java + + The lifetime argument is not implemented for Java APIs yet. + Tips for Using Placement Groups ------------------------------- - Learn the :ref:`lifecycle ` of placement groups. diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8ba80852f..0fc3f4bf2 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -1184,7 +1184,8 @@ cdef class CoreWorker: self, c_string name, c_vector[unordered_map[c_string, double]] bundles, - c_string strategy): + c_string strategy, + c_bool is_detached): cdef: CPlacementGroupID c_placement_group_id CPlacementStrategy c_strategy @@ -1208,7 +1209,8 @@ cdef class CoreWorker: CPlacementGroupCreationOptions( name, c_strategy, - bundles + bundles, + is_detached ), &c_placement_group_id)) diff --git a/python/ray/actor.py b/python/ray/actor.py index 499cd1eac..547a2929d 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -584,7 +584,9 @@ class ActorClass: elif lifetime == "detached": detached = True else: - raise ValueError("lifetime must be either `None` or 'detached'") + raise ValueError( + "actor `lifetime` argument must be either `None` or 'detached'" + ) if placement_group_capture_child_tasks is None: placement_group_capture_child_tasks = ( diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index a7ba4b23b..679ff6f0a 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -270,7 +270,8 @@ cdef extern from "ray/core_worker/common.h" nogil: CPlacementGroupCreationOptions( const c_string &name, CPlacementStrategy strategy, - const c_vector[unordered_map[c_string, double]] &bundles + const c_vector[unordered_map[c_string, double]] &bundles, + c_bool is_detached ) cdef extern from "ray/gcs/gcs_client.h" nogil: diff --git a/python/ray/tests/test_placement_group.py b/python/ray/tests/test_placement_group.py index 7c5963f9e..87273a499 100644 --- a/python/ray/tests/test_placement_group.py +++ b/python/ray/tests/test_placement_group.py @@ -1309,6 +1309,119 @@ def test_schedule_placement_groups_at_the_same_time(): wait_for_condition(is_all_placement_group_removed) + ray.shutdown() + + +def test_detached_placement_group(ray_start_cluster): + cluster = ray_start_cluster + for _ in range(2): + cluster.add_node(num_cpus=3) + cluster.wait_for_nodes() + info = ray.init(address=cluster.address) + + # Make sure detached placement group will alive when job dead. + driver_code = f""" +import ray + +ray.init(address="{info["redis_address"]}") + +pg = ray.util.placement_group( + [{{"CPU": 1}} for _ in range(2)], + strategy="STRICT_SPREAD", lifetime="detached") +ray.get(pg.ready()) + +@ray.remote(num_cpus=1) +class Actor: + def ready(self): + return True + +for bundle_index in range(2): + actor = Actor.options(lifetime="detached", placement_group=pg, + placement_group_bundle_index=bundle_index).remote() + ray.get(actor.ready.remote()) + +ray.shutdown() + """ + + run_string_as_driver(driver_code) + + # Wait until the driver is reported as dead by GCS. + def is_job_done(): + jobs = ray.jobs() + for job in jobs: + if "StopTime" in job: + return True + return False + + def assert_alive_num_pg(expected_num_pg): + alive_num_pg = 0 + for _, placement_group_info in ray.util.placement_group_table().items( + ): + if placement_group_info["state"] == "CREATED": + alive_num_pg += 1 + return alive_num_pg == expected_num_pg + + def assert_alive_num_actor(expected_num_actor): + alive_num_actor = 0 + for actor_info in ray.actors().values(): + if actor_info["State"] == ray.gcs_utils.ActorTableData.ALIVE: + alive_num_actor += 1 + return alive_num_actor == expected_num_actor + + wait_for_condition(is_job_done) + + assert assert_alive_num_pg(1) + assert assert_alive_num_actor(2) + + # Make sure detached placement group will alive when its creator which + # is detached actor dead. + # Test actors first. + @ray.remote(num_cpus=1) + class NestedActor: + def ready(self): + return True + + @ray.remote(num_cpus=1) + class Actor: + def __init__(self): + self.actors = [] + + def ready(self): + return True + + def schedule_nested_actor_with_detached_pg(self): + # Create placement group which is detached. + pg = ray.util.placement_group( + [{ + "CPU": 1 + } for _ in range(2)], + strategy="STRICT_SPREAD", + lifetime="detached", + name="detached_pg") + ray.get(pg.ready()) + # Schedule nested actor with the placement group. + for bundle_index in range(2): + actor = NestedActor.options( + placement_group=pg, + placement_group_bundle_index=bundle_index, + lifetime="detached").remote() + ray.get(actor.ready.remote()) + self.actors.append(actor) + + a = Actor.options(lifetime="detached").remote() + ray.get(a.ready.remote()) + # 1 parent actor and 2 children actor. + ray.get(a.schedule_nested_actor_with_detached_pg.remote()) + + # Kill an actor and wait until it is killed. + ray.kill(a) + with pytest.raises(ray.exceptions.RayActorError): + ray.get(a.ready.remote()) + + # We should have 2 alive pgs and 4 alive actors. + assert assert_alive_num_pg(2) + assert assert_alive_num_actor(4) + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/util/placement_group.py b/python/ray/util/placement_group.py index be24772ab..6d15f607f 100644 --- a/python/ray/util/placement_group.py +++ b/python/ray/util/placement_group.py @@ -145,7 +145,8 @@ class PlacementGroup: def placement_group(bundles: List[Dict[str, float]], strategy: str = "PACK", - name: str = "unnamed_group") -> PlacementGroup: + name: str = "unnamed_group", + lifetime=None) -> PlacementGroup: """Asynchronously creates a PlacementGroup. Args: @@ -160,6 +161,10 @@ def placement_group(bundles: List[Dict[str, float]], - "STRICT_SPREAD": Packs Bundles across distinct nodes. name(str): The name of the placement group. + lifetime(str): Either `None`, which defaults to the placement group + will fate share with its creator and will be deleted once its + creator is dead, or "detached", which means the placement group + will live as a global object independent of the creator. Return: PlacementGroup: Placement group object. @@ -179,8 +184,16 @@ def placement_group(bundles: List[Dict[str, float]], "Bundles cannot be an empty dictionary or " f"resources with only 0 values. Bundles: {bundles}") + if lifetime is None: + detached = False + elif lifetime == "detached": + detached = True + else: + raise ValueError("placement group `lifetime` argument must be either" + " `None` or 'detached'") + placement_group_id = worker.core_worker.create_placement_group( - name, bundles, strategy) + name, bundles, strategy, detached) return PlacementGroup(placement_group_id) diff --git a/src/ray/common/placement_group.h b/src/ray/common/placement_group.h index a068ce4a1..532f69d74 100644 --- a/src/ray/common/placement_group.h +++ b/src/ray/common/placement_group.h @@ -67,8 +67,9 @@ class PlacementGroupSpecBuilder { PlacementGroupSpecBuilder &SetPlacementGroupSpec( const PlacementGroupID &placement_group_id, std::string name, const std::vector> &bundles, - const rpc::PlacementStrategy strategy, const JobID &creator_job_id, - const ActorID &creator_actor_id, bool is_creator_detached_actor) { + const rpc::PlacementStrategy strategy, const bool is_detached, + const JobID &creator_job_id, const ActorID &creator_actor_id, + bool is_creator_detached_actor) { message_->set_placement_group_id(placement_group_id.Binary()); message_->set_name(name); message_->set_strategy(strategy); @@ -82,6 +83,7 @@ class PlacementGroupSpecBuilder { message_->set_creator_job_dead(is_creator_detached_actor); message_->set_creator_actor_id(creator_actor_id.Binary()); message_->set_creator_actor_dead(creator_actor_id.IsNil()); + message_->set_is_detached(is_detached); for (size_t i = 0; i < bundles.size(); i++) { auto resources = bundles[i]; diff --git a/src/ray/core_worker/common.h b/src/ray/core_worker/common.h index 1716fe606..bb10aff95 100644 --- a/src/ray/core_worker/common.h +++ b/src/ray/core_worker/common.h @@ -144,8 +144,11 @@ using PlacementStrategy = rpc::PlacementStrategy; struct PlacementGroupCreationOptions { PlacementGroupCreationOptions( std::string name, PlacementStrategy strategy, - std::vector> bundles) - : name(std::move(name)), strategy(strategy), bundles(std::move(bundles)) {} + std::vector> bundles, bool is_detached) + : name(std::move(name)), + strategy(strategy), + bundles(std::move(bundles)), + is_detached(is_detached) {} /// The name of the placement group. const std::string name; @@ -153,6 +156,8 @@ struct PlacementGroupCreationOptions { const PlacementStrategy strategy = rpc::PACK; /// The resource bundles in this placement group. const std::vector> bundles; + /// Whether to keep the placement group persistent after its creator dead. + const bool is_detached = false; }; } // namespace ray diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index f7e473eca..2f5dcc57e 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -1463,8 +1463,8 @@ Status CoreWorker::CreatePlacementGroup( builder.SetPlacementGroupSpec( placement_group_id, placement_group_creation_options.name, placement_group_creation_options.bundles, placement_group_creation_options.strategy, - worker_context_.GetCurrentJobID(), worker_context_.GetCurrentActorID(), - worker_context_.CurrentActorDetached()); + placement_group_creation_options.is_detached, worker_context_.GetCurrentJobID(), + worker_context_.GetCurrentActorID(), worker_context_.CurrentActorDetached()); PlacementGroupSpecification placement_group_spec = builder.Build(); *return_placement_group_id = placement_group_id; RAY_LOG(INFO) << "Submitting Placement Group creation to GCS: " << placement_group_id; diff --git a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc index 5470f70fb..cd374b76a 100644 --- a/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/io_ray_runtime_task_NativeTaskSubmitter.cc @@ -201,7 +201,8 @@ inline ray::PlacementGroupCreationOptions ToPlacementGroupCreationOptions( }); }); return ray::PlacementGroupCreationOptions(JavaStringToNativeString(env, name), - ConvertStrategy(java_strategy), bundles); + ConvertStrategy(java_strategy), bundles, + /*is_detached=*/false); } #ifdef __cplusplus diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc index b56f6b1d3..a856002b6 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.cc @@ -96,11 +96,15 @@ void GcsPlacementGroup::MarkCreatorActorDead() { placement_group_table_data_.set_creator_actor_dead(true); } -bool GcsPlacementGroup::IsPlacementGroupRemovable() const { - return placement_group_table_data_.creator_job_dead() && +bool GcsPlacementGroup::IsPlacementGroupLifetimeDone() const { + return !IsDetached() && placement_group_table_data_.creator_job_dead() && placement_group_table_data_.creator_actor_dead(); } +bool GcsPlacementGroup::IsDetached() const { + return placement_group_table_data_.is_detached(); +} + ///////////////////////////////////////////////////////////////////////////////////////// GcsPlacementGroupManager::GcsPlacementGroupManager( @@ -495,7 +499,7 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenJobDead( continue; } placement_group->MarkCreatorJobDead(); - if (placement_group->IsPlacementGroupRemovable()) { + if (placement_group->IsPlacementGroupLifetimeDone()) { RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); } } @@ -509,7 +513,7 @@ void GcsPlacementGroupManager::CleanPlacementGroupIfNeededWhenActorDead( continue; } placement_group->MarkCreatorActorDead(); - if (placement_group->IsPlacementGroupRemovable()) { + if (placement_group->IsPlacementGroupLifetimeDone()) { RemovePlacementGroup(placement_group->GetPlacementGroupID(), [](Status status) {}); } } diff --git a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h index c76849108..28ce82090 100644 --- a/src/ray/gcs/gcs_server/gcs_placement_group_manager.h +++ b/src/ray/gcs/gcs_server/gcs_placement_group_manager.h @@ -61,6 +61,7 @@ class GcsPlacementGroup { placement_group_spec.creator_job_dead()); placement_group_table_data_.set_creator_actor_dead( placement_group_spec.creator_actor_dead()); + placement_group_table_data_.set_is_detached(placement_group_spec.is_detached()); } /// Get the immutable PlacementGroupTableData of this placement group. @@ -107,8 +108,11 @@ class GcsPlacementGroup { /// Mark that the creator actor of this placement group is dead. void MarkCreatorActorDead(); - /// Return True if the placement group is removable. False otherwise. - bool IsPlacementGroupRemovable() const; + /// Return True if the placement group lifetime is done. False otherwise. + bool IsPlacementGroupLifetimeDone() const; + + /// Returns whether or not this is a detached placement group. + bool IsDetached() const; private: /// The placement_group meta data which contains the task specification as well as the diff --git a/src/ray/gcs/test/gcs_test_util.h b/src/ray/gcs/test/gcs_test_util.h index bf908c3a2..4d51fdd86 100644 --- a/src/ray/gcs/test/gcs_test_util.h +++ b/src/ray/gcs/test/gcs_test_util.h @@ -101,8 +101,9 @@ struct Mocker { PlacementGroupSpecBuilder builder; auto placement_group_id = PlacementGroupID::FromRandom(); - builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy, job_id, - actor_id, /* is_creator_detached */ false); + builder.SetPlacementGroupSpec(placement_group_id, name, bundles, strategy, + /* is_detached */ false, job_id, actor_id, + /* is_creator_detached */ false); return builder.Build(); } diff --git a/src/ray/protobuf/common.proto b/src/ray/protobuf/common.proto index cc3149e84..844f44bea 100644 --- a/src/ray/protobuf/common.proto +++ b/src/ray/protobuf/common.proto @@ -233,6 +233,8 @@ message PlacementGroupSpec { bool creator_job_dead = 7; // Whether or not if the creator actor is dead. bool creator_actor_dead = 8; + // Whether the placement group is persistent. + bool is_detached = 9; } message ObjectReference { diff --git a/src/ray/protobuf/gcs.proto b/src/ray/protobuf/gcs.proto index 1e59ae812..902c29cb7 100644 --- a/src/ray/protobuf/gcs.proto +++ b/src/ray/protobuf/gcs.proto @@ -191,6 +191,8 @@ message PlacementGroupTableData { bool creator_job_dead = 8; // Whether or not if the creator actor is dead. bool creator_actor_dead = 9; + // Whether the placement group is persistent. + bool is_detached = 10; } message ScheduleData {