[Placement Group] Support Placement Group state table. (#10090)

* Done.

* Addressed code review.

* Linting.

* Fix lint.

* Fix lint.

* Fix a test.

* Lint.

* Add a lint sleep to test.

* Fix the lint issue.

* Fixed doc build error.
This commit is contained in:
SangBin Cho
2020-08-17 09:24:50 -07:00
committed by GitHub
parent edd783bc32
commit 053188dfbe
23 changed files with 252 additions and 53 deletions
+3 -8
View File
@@ -1,13 +1,8 @@
from .api import get, wait
from .dynamic_resources import set_resource
from .object_spilling import force_spill_objects, force_restore_spilled_objects
from .placement_group import (
placement_group, )
from .placement_group import (placement_group, placement_group_table)
__all__ = [
"get",
"wait",
"set_resource",
"force_spill_objects",
"force_restore_spilled_objects",
"placement_group",
"get", "wait", "set_resource", "force_spill_objects",
"force_restore_spilled_objects", "placement_group", "placement_group_table"
]
@@ -30,3 +30,10 @@ def placement_group(bundles: List[Dict[str, float]],
name, bundles, strategy)
return placement_group_id
def placement_group_table(placement_group_id):
assert placement_group_id is not None
worker = ray.worker.global_worker
worker.check_connected()
return ray.state.state.placement_group_table(placement_group_id)
+13 -42
View File
@@ -1,48 +1,19 @@
from ray.core.generated.gcs_pb2 import (
ActorCheckpointIdData,
ActorTableData,
GcsNodeInfo,
JobTableData,
JobConfig,
ErrorTableData,
ErrorType,
GcsEntry,
HeartbeatBatchTableData,
HeartbeatTableData,
ObjectTableData,
ProfileTableData,
TablePrefix,
TablePubsub,
TaskTableData,
ResourceMap,
ResourceTableData,
ObjectLocationInfo,
PubSubMessage,
WorkerTableData,
)
ActorCheckpointIdData, ActorTableData, GcsNodeInfo, JobTableData,
JobConfig, ErrorTableData, ErrorType, GcsEntry, HeartbeatBatchTableData,
HeartbeatTableData, ObjectTableData, ProfileTableData, TablePrefix,
TablePubsub, TaskTableData, ResourceMap, ResourceTableData,
ObjectLocationInfo, PubSubMessage, WorkerTableData,
PlacementGroupTableData)
__all__ = [
"ActorCheckpointIdData",
"ActorTableData",
"GcsNodeInfo",
"JobTableData",
"JobConfig",
"ErrorTableData",
"ErrorType",
"GcsEntry",
"HeartbeatBatchTableData",
"HeartbeatTableData",
"ObjectTableData",
"ProfileTableData",
"TablePrefix",
"TablePubsub",
"TaskTableData",
"ResourceMap",
"ResourceTableData",
"construct_error_message",
"ObjectLocationInfo",
"PubSubMessage",
"WorkerTableData",
"ActorCheckpointIdData", "ActorTableData", "GcsNodeInfo", "JobTableData",
"JobConfig", "ErrorTableData", "ErrorType", "GcsEntry",
"HeartbeatBatchTableData", "HeartbeatTableData", "ObjectTableData",
"ProfileTableData", "TablePrefix", "TablePubsub", "TaskTableData",
"ResourceMap", "ResourceTableData", "construct_error_message",
"ObjectLocationInfo", "PubSubMessage", "WorkerTableData",
"PlacementGroupTableData"
]
FUNCTION_PREFIX = "RemoteFunction:"
@@ -7,6 +7,7 @@ from ray.includes.unique_ids cimport (
CClientID,
CObjectID,
CWorkerID,
CPlacementGroupID,
)
cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
@@ -27,3 +28,5 @@ cdef extern from "ray/gcs/gcs_client/global_state_accessor.h" nogil:
unique_ptr[c_string] GetWorkerInfo(const CWorkerID &worker_id)
c_vector[c_string] GetAllWorkerInfo()
c_bool AddWorkerInfo(const c_string &serialized_string)
unique_ptr[c_string] GetPlacementGroupInfo(
const CPlacementGroupID &placement_group_id)
@@ -3,6 +3,7 @@ from ray.includes.unique_ids cimport (
CClientID,
CObjectID,
CWorkerID,
CPlacementGroupID
)
from ray.includes.global_state_accessor cimport (
@@ -114,3 +115,14 @@ cdef class GlobalStateAccessor:
with nogil:
result = self.inner.get().AddWorkerInfo(cserialized_string)
return result
def get_placement_group_info(self, placement_group_id):
cdef unique_ptr[c_string] result
cdef CPlacementGroupID cplacement_group_id = (
CPlacementGroupID.FromBinary(placement_group_id.binary()))
with nogil:
result = self.inner.get().GetPlacementGroupInfo(
cplacement_group_id)
if result:
return c_string(result.get().data(), result.get().size())
return None
+51
View File
@@ -377,6 +377,57 @@ class GlobalState:
return dict(result)
# SANG-TODO Add functions.
def placement_group_table(self, placement_group_id=None):
self._check_connected()
if placement_group_id is not None:
placement_group_id = ray.PlacementGroupID(
hex_to_binary(placement_group_id.hex()))
placement_group_info = (
self.global_state_accessor.get_placement_group_info(
placement_group_id))
if placement_group_info is None:
return {}
else:
placement_group_info = (gcs_utils.PlacementGroupTableData.
FromString(placement_group_info))
return self._gen_placement_group_info(placement_group_info)
else:
raise NotImplementedError(
"Get all placement group is not implemented yet.")
def _gen_placement_group_info(self, placement_group_info):
# This should be imported here, otherwise, it will error doc build.
from ray.core.generated.common_pb2 import PlacementStrategy
def get_state(state):
if state == ray.gcs_utils.PlacementGroupTableData.PENDING:
return "PENDING"
elif state == ray.gcs_utils.PlacementGroupTableData.ALIVE:
return "ALIVE"
else:
return "DEAD"
def get_strategy(strategy):
if strategy == PlacementStrategy.PACK:
return "PACK"
else:
return "SPREAD"
assert placement_group_info is not None
return {
"placement_group_id": binary_to_hex(
placement_group_info.placement_group_id),
"name": placement_group_info.name,
"bundles": {
bundle.bundle_id.bundle_index: bundle.unit_resources
for bundle in placement_group_info.bundles
},
"strategy": get_strategy(placement_group_info.strategy),
"state": get_state(placement_group_info.state),
}
def _seconds_to_microseconds(self, time_in_seconds):
"""A helper function for converting seconds to microseconds."""
time_in_microseconds = 10**6 * time_in_seconds
+41
View File
@@ -219,6 +219,47 @@ def test_placement_group_hang(ray_start_cluster):
assert "CPU_group_" in list(resources.keys())[0], resources
def test_placement_group_table(ray_start_cluster):
@ray.remote(num_cpus=2)
class Actor(object):
def __init__(self):
self.n = 0
def value(self):
return self.n
cluster = ray_start_cluster
num_nodes = 2
for _ in range(num_nodes):
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
# Originally placement group creation should be pending because
# there are no resources.
name = "name"
strategy = "PACK"
bundles = [{"CPU": 2, "GPU": 1}, {"CPU": 2}]
placement_group_id = ray.experimental.placement_group(
name=name, strategy=strategy, bundles=bundles)
result = ray.experimental.placement_group_table(placement_group_id)
assert result["name"] == name
assert result["strategy"] == strategy
for i in range(len(bundles)):
assert bundles[i] == result["bundles"][i]
assert result["state"] == "PENDING"
# Now the placement group should be scheduled.
cluster.add_node(num_cpus=5, num_gpus=1)
cluster.wait_for_nodes()
actor_1 = Actor.options(
placement_group_id=placement_group_id,
placement_group_bundle_index=0).remote()
ray.get(actor_1.value.remote())
result = ray.experimental.placement_group_table(placement_group_id)
assert result["state"] == "ALIVE"
def test_cuda_visible_devices(ray_start_cluster):
@ray.remote(num_gpus=1)
def f():