mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 15:06:28 +08:00
[Core] Report worker backlog in GCS heartbeat (#11039)
This commit is contained in:
@@ -295,6 +295,62 @@ def test_placement_group_load_report(ray_start_cluster):
|
||||
client.close()
|
||||
|
||||
|
||||
def test_backlog_report(shutdown_only):
|
||||
cluster = ray.init(
|
||||
num_cpus=1, _system_config={
|
||||
"report_worker_backlog": True,
|
||||
})
|
||||
redis = ray._private.services.create_redis_client(
|
||||
cluster["redis_address"],
|
||||
password=ray.ray_constants.REDIS_DEFAULT_PASSWORD)
|
||||
client = redis.pubsub(ignore_subscribe_messages=True)
|
||||
client.psubscribe(ray.gcs_utils.XRAY_HEARTBEAT_BATCH_PATTERN)
|
||||
|
||||
@ray.remote(num_cpus=1)
|
||||
def foo(x):
|
||||
print(".")
|
||||
time.sleep(x)
|
||||
return None
|
||||
|
||||
def backlog_size_set():
|
||||
try:
|
||||
raw_message = client.get_message()
|
||||
except Exception:
|
||||
return False
|
||||
if raw_message is None:
|
||||
return False
|
||||
|
||||
data = raw_message["data"]
|
||||
pub_message = ray.gcs_utils.PubSubMessage.FromString(data)
|
||||
heartbeat_data = pub_message.data
|
||||
|
||||
message = ray.gcs_utils.HeartbeatBatchTableData.FromString(
|
||||
heartbeat_data)
|
||||
aggregate_resource_load = \
|
||||
message.resource_load_by_shape.resource_demands
|
||||
if len(aggregate_resource_load) == 1:
|
||||
backlog_size = aggregate_resource_load[0].backlog_size
|
||||
print(backlog_size)
|
||||
# Ideally we'd want to assert backlog_size == 8, but guaranteeing
|
||||
# the order the order that submissions will occur is too
|
||||
# hard/flaky.
|
||||
return backlog_size > 0
|
||||
return False
|
||||
|
||||
# We want this first task to finish
|
||||
refs = [foo.remote(0.5)]
|
||||
# These tasks should all start _before_ the first one finishes.
|
||||
refs.extend([foo.remote(1000) for _ in range(9)])
|
||||
# Now there's 1 request running, 1 queued in the raylet, and 8 queued in
|
||||
# the worker backlog.
|
||||
|
||||
ray.get(refs[0])
|
||||
# First request finishes, second request is now running, third lease
|
||||
# request is sent to the raylet with backlog=7
|
||||
|
||||
ray.test_utils.wait_for_condition(backlog_size_set, timeout=2)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import pytest
|
||||
import sys
|
||||
|
||||
Reference in New Issue
Block a user