mirror of
https://github.com/wassname/ray.git
synced 2026-07-04 05:01:48 +08:00
[docs] Placement group documentation (#10555)
Co-authored-by: Richard Liaw <rliaw@berkeley.edu>
This commit is contained in:
committed by
Barak Michener
parent
a6a7886529
commit
c79eb7984d
@@ -42,7 +42,6 @@ MOCK_MODULES = [
|
||||
"kubernetes",
|
||||
"mxnet.model",
|
||||
"psutil",
|
||||
"ray._raylet",
|
||||
"ray.core.generated",
|
||||
"ray.core.generated.common_pb2",
|
||||
"ray.core.generated.gcs_pb2",
|
||||
|
||||
@@ -205,6 +205,7 @@ Papers
|
||||
walkthrough.rst
|
||||
using-ray.rst
|
||||
configure.rst
|
||||
ray-dashboard.rst
|
||||
Tutorial and Examples <auto_examples/overview.rst>
|
||||
package-ref.rst
|
||||
|
||||
|
||||
@@ -124,12 +124,40 @@ ray.available_resources
|
||||
.. autofunction:: ray.available_resources
|
||||
|
||||
ray.cross_language
|
||||
~~~~~~~~~~~~~~~~~~~~~~~
|
||||
~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. autofunction:: ray.java_function
|
||||
|
||||
.. autofunction:: ray.java_actor_class
|
||||
|
||||
.. _ray-placement-group-ref:
|
||||
|
||||
Placement Group APIs
|
||||
--------------------
|
||||
|
||||
placement_group
|
||||
~~~~~~~~~~~~~~~
|
||||
|
||||
.. autofunction:: ray.util.placement_group.placement_group
|
||||
|
||||
|
||||
PlacementGroup (class)
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. autoclass:: ray.util.placement_group.PlacementGroup
|
||||
:members:
|
||||
|
||||
placement_group_table
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. autofunction:: ray.util.placement_group.placement_group_table
|
||||
|
||||
|
||||
remove_placement_group
|
||||
~~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
.. autofunction:: ray.util.placement_group.remove_placement_group
|
||||
|
||||
Experimental APIs
|
||||
-----------------
|
||||
|
||||
|
||||
@@ -0,0 +1,201 @@
|
||||
Placement Groups
|
||||
================
|
||||
|
||||
Placement groups allow users to atomically reserve groups of resources across multiple nodes (i.e., gang scheduling). They can be then used to schedule Ray tasks and actors to be packed as close as possible for locality (PACK), or spread apart (SPREAD).
|
||||
|
||||
Here are some use cases:
|
||||
|
||||
- **Gang Scheduling**: Your application requires all tasks/actors to be scheduled and start at the same time.
|
||||
- **Maximizing data locality**: You'd like to place or schedule your tasks and actors close to your data to avoid object transfer overheads.
|
||||
- **Load balancing**: To improve application availability and avoid resource overload, you'd like to place your actors or tasks into different physical machines as much as possible.
|
||||
|
||||
|
||||
Key Concepts
|
||||
------------
|
||||
|
||||
A **bundle** is a collection of "resources", i.e. {"GPU": 4}.
|
||||
|
||||
- A bundle must be able to fit on a single node on the Ray cluster.
|
||||
- Bundles are then placed according to the "placement group strategy" across nodes on the cluster.
|
||||
|
||||
|
||||
A **placement group** is a collection of bundles.
|
||||
|
||||
- Each bundle is given an "index" within the placement group
|
||||
- Bundles are then placed according to the "placement group strategy" across nodes on the cluster.
|
||||
- After the placement group is created, tasks or actors can be then scheduled according to the placement group and even on individual bundles.
|
||||
|
||||
|
||||
A **placement group strategy** is an algorithm for selecting nodes for bundle placement. Read more about :ref:`placement strategies <pgroup-strategy>`.
|
||||
|
||||
|
||||
Starting a placement group
|
||||
--------------------------
|
||||
|
||||
Ray placement group can be created via the ``ray.util.placement_group`` API. Placement groups take in a list of bundles and a :ref:`placement strategy <pgroup-strategy>`:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Import placement group APIs.
|
||||
from ray.util.placement_group import (
|
||||
placement_group,
|
||||
placement_group_table,
|
||||
remove_placement_group
|
||||
)
|
||||
|
||||
# Initialize Ray.
|
||||
import ray
|
||||
ray.init(num_gpus=2, resources={"extra_resource": 2})
|
||||
|
||||
bundle1 = {"GPU": 2}
|
||||
bundle2 = {"extra_resource": 2}
|
||||
|
||||
pg = placement_group([bundle1, bundle2], strategy="STRICT_PACK")
|
||||
|
||||
.. important:: Each bundle must be able to fit on a single node on the Ray cluster.
|
||||
|
||||
Placement groups are atomically created - meaning that if there exists a bundle that cannot fit in any of the current nodes, then the entire placement group will not be ready.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
|
||||
# You can also use ray.wait.
|
||||
ready, unready = ray.wait([pg.ready()], timeout=0)
|
||||
|
||||
# You can look at placement group states using this API.
|
||||
pprint(placement_group_table(pg))
|
||||
|
||||
Infeasible placement groups will be pending until resources are available. The Ray Autoscaler will be aware of placement groups, and auto-scale the cluster to ensure pending groups can be placed as needed.
|
||||
|
||||
.. _pgroup-strategy:
|
||||
|
||||
Strategy types
|
||||
--------------
|
||||
|
||||
Ray currently supports the following placement group strategies:
|
||||
|
||||
**STRICT_PACK**
|
||||
|
||||
All bundles must be placed into a single node on the cluster.
|
||||
|
||||
**PACK**
|
||||
|
||||
All provided bundles are packed onto a single node on a best-effort basis.
|
||||
If strict packing is not feasible (i.e., some bundles do not fit on the node), bundles can be placed onto other nodes nodes.
|
||||
|
||||
**STRICT_SPREAD**
|
||||
|
||||
Each bundle must be scheduled in a separate node.
|
||||
|
||||
**SPREAD**
|
||||
|
||||
Each bundle will be spread onto separate nodes on a best effort basis.
|
||||
If strict spreading is not feasible, bundles can be placed overlapping nodes.
|
||||
|
||||
Quick Start
|
||||
-----------
|
||||
|
||||
Let's see an example of using placement group. Note that this example is done within a single node.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import ray
|
||||
from pprint import pprint
|
||||
|
||||
# Import placement group APIs.
|
||||
from ray.util.placement_group import (
|
||||
placement_group,
|
||||
placement_group_table,
|
||||
remove_placement_group
|
||||
)
|
||||
|
||||
ray.init(num_gpus=2, resources={"extra_resource": 2})
|
||||
|
||||
Let's create a placement group. Recall that each bundle is a collection of resources, and tasks or actors can be scheduled on each bundle.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
gpu_bundle = {"GPU": 2}
|
||||
extra_resource_bundle = {"extra_resource": 2}
|
||||
|
||||
# Reserve bundles with strict pack strategy.
|
||||
# It means Ray will reserve 2 "GPU" and 2 "extra_resource" on the same node (strict pack) within a Ray cluster.
|
||||
# Using this placement group for scheduling actors or tasks will guarantee that they will
|
||||
# be colocated on the same node.
|
||||
pg = placement_group([gpu_bundle, extra_resource_bundle], strategy="STRICT_PACK")
|
||||
|
||||
# Wait until placement group is created.
|
||||
ray.get(pg.ready())
|
||||
|
||||
Now let's define an actor that uses GPU. We'll also define a task that use ``extra_resources``.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
class GPUActor:
|
||||
def __init__(self):
|
||||
pass
|
||||
|
||||
@ray.remote(resources={"extra_resource": 1})
|
||||
def extra_resource_task():
|
||||
import time
|
||||
# simulate long-running task.
|
||||
time.sleep(10)
|
||||
|
||||
# Create GPU actors on a gpu bundle.
|
||||
gpu_actors = [GPUActor.options(
|
||||
placement_group=pg,
|
||||
# This is the index from the original list.
|
||||
placement_group_bundle_index=0) # Index of gpu_bundle is 0.
|
||||
.remote() for _ in range(2)]
|
||||
|
||||
# Create extra_resource actors on a extra_resource bundle.
|
||||
extra_resource_actors = [extra_resource_task.options(
|
||||
placement_group=pg,
|
||||
# This is the index from the original list.
|
||||
placement_group_bundle_index=1) # Index of extra_resource_bundle is 1.
|
||||
.remote() for _ in range(2)]
|
||||
|
||||
Now, you can guarantee all gpu actors and extra_resource tasks are located on the same node
|
||||
because they are scheduled on a placement group with the STRICT_PACK strategy.
|
||||
|
||||
Note that you must remove the placement group once you are finished with your application.
|
||||
Workers of actors and tasks that are scheduled on placement group will be all killed:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# This API is asynchronous.
|
||||
remove_placement_group(pg)
|
||||
|
||||
# Wait until placement group is killed.
|
||||
import time
|
||||
time.sleep(1)
|
||||
# Check the placement group has died.
|
||||
pprint(placement_group_table(pg))
|
||||
|
||||
"""
|
||||
{'bundles': {0: {'GPU': 2.0}, 1: {'extra_resource': 2.0}},
|
||||
'name': 'unnamed_group',
|
||||
'placement_group_id': '40816b6ad474a6942b0edb45809b39c3',
|
||||
'state': 'REMOVED',
|
||||
'strategy': 'STRICT_PACK'}
|
||||
"""
|
||||
|
||||
ray.shutdown()
|
||||
|
||||
Lifecycle
|
||||
---------
|
||||
|
||||
When placement group is first created, the request is sent to the GCS. The GCS reserve resources to nodes based on its scheduling strategy. Ray guarantees the atomic creation of placement group.
|
||||
|
||||
Placement groups are pending creation if there are no nodes that can satisfy resource requirements for a given strategy. The Ray Autoscaler will be aware of placement groups, and auto-scale the cluster to ensure pending groups can be placed as needed.
|
||||
|
||||
If nodes that contain some bundles of a placement group die, bundles will be rescheduled on different nodes by GCS. This means that the initial creation of placement group is "atomic", but once it is created, there could be partial placement groups.
|
||||
|
||||
Unlike actors and tasks, placement group is currently not fault tolerant yet. It is in progress.
|
||||
|
||||
API Reference
|
||||
-------------
|
||||
:ref:`Placement Group API reference <ray-placement-group-ref>`
|
||||
@@ -16,8 +16,8 @@ Finally, we've also included some content on using core Ray APIs with `Tensorflo
|
||||
using-ray-with-gpus.rst
|
||||
serialization.rst
|
||||
memory-management.rst
|
||||
placement-group.rst
|
||||
troubleshooting.rst
|
||||
ray-dashboard.rst
|
||||
advanced.rst
|
||||
cross-language.rst
|
||||
using-ray-with-tensorflow.rst
|
||||
|
||||
+8
-1
@@ -407,8 +407,15 @@ class GlobalState:
|
||||
def get_strategy(strategy):
|
||||
if strategy == PlacementStrategy.PACK:
|
||||
return "PACK"
|
||||
else:
|
||||
elif strategy == PlacementStrategy.STRICT_PACK:
|
||||
return "STRICT_PACK"
|
||||
elif strategy == PlacementStrategy.STRICT_SPREAD:
|
||||
return "STRICT_SPREAD"
|
||||
elif strategy == PlacementStrategy.SPREAD:
|
||||
return "SPREAD"
|
||||
else:
|
||||
raise ValueError(
|
||||
f"Invalid strategy returned: {PlacementStrategy}")
|
||||
|
||||
assert placement_group_info is not None
|
||||
return {
|
||||
|
||||
@@ -5,12 +5,7 @@ from ray._raylet import PlacementGroupID, ObjectRef
|
||||
|
||||
|
||||
class PlacementGroup:
|
||||
"""A handle to a placement group.
|
||||
|
||||
Args:
|
||||
id: Placement group id.
|
||||
bundles: List of bundles.
|
||||
"""
|
||||
"""A handle to a placement group."""
|
||||
|
||||
@staticmethod
|
||||
def empty():
|
||||
@@ -21,7 +16,19 @@ class PlacementGroup:
|
||||
self.bundles = bundles
|
||||
|
||||
def ready(self) -> ObjectRef:
|
||||
"""Returns an object ID to check ready status."""
|
||||
"""Returns an ObjectRef to check ready status.
|
||||
|
||||
This API runs a small dummy task to wait for placement group creation.
|
||||
It is compatible to ray.get and ray.wait.
|
||||
|
||||
Example:
|
||||
|
||||
>>> pg = placement_group([{"CPU": 1}])
|
||||
ray.get(pg.ready())
|
||||
|
||||
>>> pg = placement_group([{"CPU": 1}])
|
||||
ray.wait([pg.ready()], timeout=0)
|
||||
"""
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
|
||||
@@ -57,6 +64,11 @@ class PlacementGroup:
|
||||
placement_group_bundle_index=bundle_index,
|
||||
resources=resources).remote(self)
|
||||
|
||||
@property
|
||||
def bundle_specs(self) -> List[Dict]:
|
||||
"""List[Dict]: Return bundles belonging to this placement group."""
|
||||
return self.bundles
|
||||
|
||||
@property
|
||||
def bundle_count(self):
|
||||
return len(self.bundles)
|
||||
@@ -71,21 +83,24 @@ class PlacementGroup:
|
||||
|
||||
def placement_group(bundles: List[Dict[str, float]],
|
||||
strategy: str = "PACK",
|
||||
name: str = "unnamed_group"):
|
||||
"""
|
||||
Create a placement group.
|
||||
|
||||
This method is the api to create placement group.
|
||||
name: str = "unnamed_group") -> PlacementGroup:
|
||||
"""Asynchronously creates a PlacementGroup.
|
||||
|
||||
Args:
|
||||
bundles: A list of bundles which represent the resources needed.
|
||||
strategy: The strategy to create the placement group.
|
||||
PACK: Packs Bundles into as few nodes as possible.
|
||||
SPREAD: Places Bundles across distinct nodes as even as possible.
|
||||
STRICT_PACK: Packs Bundles into one node.
|
||||
STRICT_SPREAD: Packs Bundles across distinct nodes.
|
||||
The group is not allowed to span multiple nodes.
|
||||
name: The name of the placement group.
|
||||
bundles(List[Dict]): A list of bundles which
|
||||
represent the resources requirements.
|
||||
strategy(str): The strategy to create the placement group.
|
||||
|
||||
- "PACK": Packs Bundles into as few nodes as possible.
|
||||
- "SPREAD": Places Bundles across distinct nodes as even as possible.
|
||||
- "STRICT_PACK": Packs Bundles into one node. The group is
|
||||
not allowed to span multiple nodes.
|
||||
- "STRICT_SPREAD": Packs Bundles across distinct nodes.
|
||||
|
||||
name(str): The name of the placement group.
|
||||
|
||||
Return:
|
||||
PlacementGroup: Placement group object.
|
||||
"""
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
@@ -108,7 +123,12 @@ def placement_group(bundles: List[Dict[str, float]],
|
||||
return PlacementGroup(placement_group_id, bundles)
|
||||
|
||||
|
||||
def remove_placement_group(placement_group):
|
||||
def remove_placement_group(placement_group: PlacementGroup):
|
||||
"""Asynchronously remove placement group.
|
||||
|
||||
Args:
|
||||
placement_group (PlacementGroup): The placement group to delete.
|
||||
"""
|
||||
assert placement_group is not None
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
@@ -116,14 +136,21 @@ def remove_placement_group(placement_group):
|
||||
worker.core_worker.remove_placement_group(placement_group.id)
|
||||
|
||||
|
||||
def placement_group_table(placement_group):
|
||||
def placement_group_table(placement_group: PlacementGroup) -> dict:
|
||||
"""Get the state of the placement group from GCS.
|
||||
|
||||
Args:
|
||||
placement_group (PlacementGroup): placement group to see
|
||||
states.
|
||||
"""
|
||||
assert placement_group is not None
|
||||
worker = ray.worker.global_worker
|
||||
worker.check_connected()
|
||||
return ray.state.state.placement_group_table(placement_group.id)
|
||||
|
||||
|
||||
def check_placement_group_index(placement_group, bundle_index):
|
||||
def check_placement_group_index(placement_group: PlacementGroup,
|
||||
bundle_index: int):
|
||||
assert placement_group is not None
|
||||
if placement_group.id.is_nil():
|
||||
if bundle_index != -1:
|
||||
|
||||
Reference in New Issue
Block a user