[Placement Group] Report placement group load through heartbeat. (#11129)

* In progress.

* Fix a minor issue.

* Removed unnecessary comments.

* Addressed code review.

* Fix build failure.

* remove stray logs.

* Move global state to a med size test to avoid windows CI breakage.
This commit is contained in:
SangBin Cho
2020-10-04 16:47:22 -07:00
committed by GitHub
parent 035a4ad0a2
commit 80cc161f3e
10 changed files with 182 additions and 7 deletions
+1 -1
View File
@@ -29,6 +29,7 @@ py_test_module_list(
"test_error_ray_not_initialized.py",
"test_gcs_fault_tolerance.py",
"test_global_gc.py",
"test_global_state.py",
"test_iter.py",
"test_joblib.py",
"test_resource_demand_scheduler.py",
@@ -81,7 +82,6 @@ py_test_module_list(
"test_dask_scheduler.py",
"test_dask_callback.py",
"test_debug_tools.py",
"test_global_state.py",
"test_job.py",
"test_memstat.py",
"test_metrics_agent.py",
+81
View File
@@ -212,6 +212,87 @@ def test_load_report(shutdown_only, max_shapes):
else:
assert demand.num_ready_requests_queued > 0
assert demand.num_infeasible_requests_queued == 0
client.close()
def test_placement_group_load_report(ray_start_cluster):
cluster = ray_start_cluster
# Add a head node that doesn't have gpu resource.
cluster.add_node(num_cpus=4)
ray.init(address=cluster.address)
redis = ray._private.services.create_redis_client(
cluster.address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD)
redis = ray._private.services.create_redis_client(
cluster.address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD)
client = redis.pubsub(ignore_subscribe_messages=True)
client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN)
class PgLoadChecker:
def nothing_is_ready(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
return len(pg_load.placement_group_data) == 2
return False
def only_first_one_ready(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
return len(pg_load.placement_group_data) == 1
return False
def two_infeasible_pg(self):
heartbeat = self._read_heartbeat()
if not heartbeat:
return False
if heartbeat.HasField("placement_group_load"):
pg_load = heartbeat.placement_group_load
return len(pg_load.placement_group_data) == 2
return False
def _read_heartbeat(self):
try:
message = client.get_message()
except redis.exceptions.ConnectionError:
pass
if message is None:
return None
pattern = message["pattern"]
data = message["data"]
if pattern != ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN:
return None
pub_message = ray.gcs_utils.PubSubMessage.FromString(data)
heartbeat_data = pub_message.data
heartbeat = ray.gcs_utils.HeartbeatBatchTableData.FromString(
heartbeat_data)
return heartbeat
checker = PgLoadChecker()
# Create 2 placement groups that are infeasible.
pg_feasible = ray.util.placement_group([{"A": 1}])
pg_infeasible = ray.util.placement_group([{"B": 1}])
_, unready = ray.wait(
[pg_feasible.ready(), pg_infeasible.ready()], timeout=0)
assert len(unready) == 2
ray.test_utils.wait_for_condition(checker.nothing_is_ready)
# Add a node that makes pg feasible. Make sure load include this change.
cluster.add_node(resources={"A": 1})
ray.get(pg_feasible.ready())
ray.test_utils.wait_for_condition(checker.only_first_one_ready)
# Create one more infeasible pg and make sure load is properly updated.
pg_infeasible_second = ray.util.placement_group([{"C": 1}])
_, unready = ray.wait([pg_infeasible_second.ready()], timeout=0)
assert len(unready) == 1
ray.test_utils.wait_for_condition(checker.two_infeasible_pg)
client.close()
if __name__ == "__main__":