mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 09:45:24 +08:00
194 lines
5.8 KiB
Python
194 lines
5.8 KiB
Python
import logging
|
|
import pytest
|
|
import time
|
|
|
|
import ray
|
|
import ray.ray_constants as ray_constants
|
|
from ray.monitor import Monitor
|
|
from ray.cluster_utils import Cluster
|
|
from ray.test_utils import generate_system_config_map, SignalActor, \
|
|
new_scheduler_enabled
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def test_cluster():
|
|
"""Basic test for adding and removing nodes in cluster."""
|
|
g = Cluster(initialize_head=False)
|
|
node = g.add_node()
|
|
node2 = g.add_node()
|
|
assert node.remaining_processes_alive()
|
|
assert node2.remaining_processes_alive()
|
|
g.remove_node(node2)
|
|
g.remove_node(node)
|
|
assert not any(n.any_processes_alive() for n in [node, node2])
|
|
|
|
|
|
def test_shutdown():
|
|
g = Cluster(initialize_head=False)
|
|
node = g.add_node()
|
|
node2 = g.add_node()
|
|
g.shutdown()
|
|
assert not any(n.any_processes_alive() for n in [node, node2])
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ray_start_cluster_head", [
|
|
generate_system_config_map(
|
|
num_heartbeats_timeout=20, object_timeout_milliseconds=12345)
|
|
],
|
|
indirect=True)
|
|
def test_system_config(ray_start_cluster_head):
|
|
"""Checks that the internal configuration setting works.
|
|
|
|
We set the cluster to timeout nodes after 2 seconds of no timeouts. We
|
|
then remove a node, wait for 1 second to check that the cluster is out
|
|
of sync, then wait another 2 seconds (giving 1 second of leeway) to check
|
|
that the client has timed out. We also check to see if the config is set.
|
|
"""
|
|
cluster = ray_start_cluster_head
|
|
worker = cluster.add_node()
|
|
cluster.wait_for_nodes()
|
|
|
|
@ray.remote
|
|
def f():
|
|
assert ray._config.object_timeout_milliseconds() == 12345
|
|
assert ray._config.num_heartbeats_timeout() == 20
|
|
|
|
ray.get([f.remote() for _ in range(5)])
|
|
|
|
cluster.remove_node(worker, allow_graceful=False)
|
|
time.sleep(1)
|
|
assert ray.cluster_resources()["CPU"] == 2
|
|
|
|
time.sleep(2)
|
|
assert ray.cluster_resources()["CPU"] == 1
|
|
|
|
|
|
def setup_monitor(address):
|
|
monitor = Monitor(
|
|
address, None, redis_password=ray_constants.REDIS_DEFAULT_PASSWORD)
|
|
monitor.update_raylet_map(_append_port=True)
|
|
monitor.subscribe(ray.ray_constants.AUTOSCALER_RESOURCE_REQUEST_CHANNEL)
|
|
return monitor
|
|
|
|
|
|
def verify_load_metrics(monitor, expected_resource_usage=None, timeout=30):
|
|
while True:
|
|
monitor.update_load_metrics()
|
|
monitor.process_messages()
|
|
resource_usage = monitor.load_metrics._get_resource_usage()
|
|
|
|
if "memory" in resource_usage[0]:
|
|
del resource_usage[0]["memory"]
|
|
if "object_store_memory" in resource_usage[1]:
|
|
del resource_usage[0]["object_store_memory"]
|
|
if "memory" in resource_usage[1]:
|
|
del resource_usage[1]["memory"]
|
|
if "object_store_memory" in resource_usage[1]:
|
|
del resource_usage[1]["object_store_memory"]
|
|
for key in list(resource_usage[0].keys()):
|
|
if key.startswith("node:"):
|
|
del resource_usage[0][key]
|
|
for key in list(resource_usage[1].keys()):
|
|
if key.startswith("node:"):
|
|
del resource_usage[1][key]
|
|
|
|
if expected_resource_usage is None:
|
|
if all(x for x in resource_usage[0:]):
|
|
break
|
|
elif all(x == y
|
|
for x, y in zip(resource_usage, expected_resource_usage)):
|
|
break
|
|
else:
|
|
timeout -= 1
|
|
time.sleep(1)
|
|
|
|
if timeout <= 0:
|
|
raise ValueError("Timeout. {} != {}".format(
|
|
resource_usage, expected_resource_usage))
|
|
|
|
return resource_usage
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ray_start_cluster_head", [{
|
|
"num_cpus": 1,
|
|
}, {
|
|
"num_cpus": 2,
|
|
}],
|
|
indirect=True)
|
|
@pytest.mark.skipif(new_scheduler_enabled(), reason="fails")
|
|
def test_heartbeats_single(ray_start_cluster_head):
|
|
"""Unit test for `Cluster.wait_for_nodes`.
|
|
|
|
Test proper metrics.
|
|
"""
|
|
cluster = ray_start_cluster_head
|
|
monitor = setup_monitor(cluster.address)
|
|
total_cpus = ray.state.cluster_resources()["CPU"]
|
|
verify_load_metrics(monitor, ({"CPU": 0.0}, {"CPU": total_cpus}))
|
|
|
|
@ray.remote
|
|
def work(signal):
|
|
wait_signal = signal.wait.remote()
|
|
while True:
|
|
ready, not_ready = ray.wait([wait_signal], timeout=0)
|
|
if len(ready) == 1:
|
|
break
|
|
time.sleep(1)
|
|
|
|
signal = SignalActor.remote()
|
|
|
|
work_handle = work.remote(signal)
|
|
verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus}))
|
|
|
|
ray.get(signal.send.remote())
|
|
ray.get(work_handle)
|
|
|
|
@ray.remote
|
|
class Actor:
|
|
def work(self, signal):
|
|
wait_signal = signal.wait.remote()
|
|
while True:
|
|
ready, not_ready = ray.wait([wait_signal], timeout=0)
|
|
if len(ready) == 1:
|
|
break
|
|
time.sleep(1)
|
|
|
|
signal = SignalActor.remote()
|
|
|
|
test_actor = Actor.remote()
|
|
work_handle = test_actor.work.remote(signal)
|
|
|
|
verify_load_metrics(monitor, ({"CPU": 1.0}, {"CPU": total_cpus}))
|
|
|
|
ray.get(signal.send.remote())
|
|
ray.get(work_handle)
|
|
|
|
|
|
def test_wait_for_nodes(ray_start_cluster_head):
|
|
"""Unit test for `Cluster.wait_for_nodes`.
|
|
|
|
Adds 4 workers, waits, then removes 4 workers, waits,
|
|
then adds 1 worker, waits, and removes 1 worker, waits.
|
|
"""
|
|
cluster = ray_start_cluster_head
|
|
workers = [cluster.add_node() for i in range(4)]
|
|
cluster.wait_for_nodes()
|
|
[cluster.remove_node(w) for w in workers]
|
|
cluster.wait_for_nodes()
|
|
|
|
assert ray.cluster_resources()["CPU"] == 1
|
|
worker2 = cluster.add_node()
|
|
cluster.wait_for_nodes()
|
|
cluster.remove_node(worker2)
|
|
cluster.wait_for_nodes()
|
|
assert ray.cluster_resources()["CPU"] == 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import pytest
|
|
import sys
|
|
sys.exit(pytest.main(["-v", __file__]))
|