[Placement Group]Add get all placement group api (#11460)

* add get all interface for placement group

* add get all interface for placement group

* make it work

* fix lint

* fix lint

* fix comment

* add cpp test

* fix python lint
This commit is contained in:
DK.Pino
2020-10-24 02:46:48 +08:00
committed by GitHub
parent e7aa6441b7
commit 9f804ade5f
18 changed files with 157 additions and 19 deletions
@@ -31,3 +31,4 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
c_bool AddWorkerInfo(const c_string &serialized_string)
unique_ptr[c_string] GetPlacementGroupInfo(
const CPlacementGroupID &placement_group_id)
c_vector[c_string] GetAllPlacementGroupInfo()
@@ -122,6 +122,12 @@ cdef class GlobalStateAccessor:
result = self.inner.get().AddWorkerInfo(cserialized_string)
return result
def get_placement_group_table(self):
cdef c_vector[c_string] result
with nogil:
result = self.inner.get().GetAllPlacementGroupInfo()
return result
def get_placement_group_info(self, placement_group_id):
cdef unique_ptr[c_string] result
cdef CPlacementGroupID cplacement_group_id = (
+12 -2
View File
@@ -388,8 +388,18 @@ class GlobalState:
FromString(placement_group_info))
return self._gen_placement_group_info(placement_group_info)
else:
raise NotImplementedError(
"Get all placement group is not implemented yet.")
placement_group_table = self.global_state_accessor.\
get_placement_group_table()
results = {}
for placement_group_info in placement_group_table:
placement_group_table_data = gcs_utils.\
PlacementGroupTableData.FromString(placement_group_info)
placement_group_id = binary_to_hex(
placement_group_table_data.placement_group_id)
results[placement_group_id] = \
self._gen_placement_group_info(placement_group_table_data)
return results
def _gen_placement_group_info(self, placement_group_info):
# This should be imported here, otherwise, it will error doc build.
+23
View File
@@ -406,6 +406,7 @@ def test_placement_group_table(ray_start_cluster):
# Now the placement group should be scheduled.
cluster.add_node(num_cpus=5, num_gpus=1)
cluster.wait_for_nodes()
actor_1 = Actor.options(
placement_group=placement_group,
@@ -415,6 +416,28 @@ def test_placement_group_table(ray_start_cluster):
result = ray.util.placement_group_table(placement_group)
assert result["state"] == "CREATED"
# Add tow more placement group for placement group table test.
second_strategy = "SPREAD"
ray.util.placement_group(
name="second_placement_group",
strategy=second_strategy,
bundles=bundles)
ray.util.placement_group(
name="third_placement_group",
strategy=second_strategy,
bundles=bundles)
placement_group_table = ray.util.placement_group_table()
assert len(placement_group_table) == 3
true_name_set = {"name", "second_placement_group", "third_placement_group"}
get_name_set = set()
for _, placement_group_data in placement_group_table.items():
get_name_set.add(placement_group_data["name"])
assert true_name_set == get_name_set
def test_cuda_visible_devices(ray_start_cluster):
@ray.remote(num_gpus=1)
+4 -3
View File
@@ -185,17 +185,18 @@ def remove_placement_group(placement_group: PlacementGroup):
worker.core_worker.remove_placement_group(placement_group.id)
def placement_group_table(placement_group: PlacementGroup) -> dict:
def placement_group_table(placement_group: PlacementGroup = None) -> list:
"""Get the state of the placement group from GCS.
Args:
placement_group (PlacementGroup): placement group to see
states.
"""
assert placement_group is not None
worker = ray.worker.global_worker
worker.check_connected()
return ray.state.state.placement_group_table(placement_group.id)
placement_group_id = placement_group.id if (placement_group is
not None) else None
return ray.state.state.placement_group_table(placement_group_id)
def get_current_placement_group() -> Optional[PlacementGroup]: