[core] Add Global State Test for multi-node setting (#3239)

* add test for adding node

* multinode test fixes

* First pass at allowing updatable values

* Fix compilation issues

* Add config file parsing

* Full initialization

* Wrote a good test

* configuration parsing and stuff

* docs

* write some tests, make it good

* fixed init

* Add all config options and bring back stress tests.

* Update python/ray/worker.py

* Update python/ray/worker.py

* Fix internalization

* some last changes

* Linting and Java fix

* add docstring

* Fix test, add assertions

* pytest ext

* lint

* lint
This commit is contained in:
Richard Liaw
2018-11-13 10:35:24 -08:00
committed by Peter Schafhalter
parent d90f365394
commit c0423db05c
3 changed files with 67 additions and 12 deletions
+6 -1
View File
@@ -118,12 +118,17 @@ class Cluster(object):
Args:
retries (int): Number of times to retry checking client table.
Returns:
True if successfully registered nodes as expected.
"""
for i in range(retries):
if not ray.is_initialized() or not self._check_registered_nodes():
time.sleep(0.1)
else:
break
return True
return False
def _check_registered_nodes(self):
registered = len([
+57 -7
View File
@@ -2,10 +2,16 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import pytest
try:
import pytest_timeout
except ModuleNotFoundError:
pytest_timeout = None
import time
import ray
from ray.test.cluster_utils import Cluster
@pytest.fixture
@@ -17,6 +23,28 @@ def ray_start():
ray.shutdown()
@pytest.fixture
def cluster_start():
# Start the Ray processes.
cluster = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"resources": dict(CPU=1),
"_internal_config": json.dumps({
"num_heartbeats_timeout": 10
})
})
yield cluster
ray.shutdown()
cluster.shutdown()
# TODO(rliaw): The proper way to do this is to have the pytest config setup.
@pytest.mark.skipif(
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(10)
def test_replenish_resources(ray_start):
cluster_resources = ray.global_state.cluster_resources()
available_resources = ray.global_state.available_resources()
@@ -27,17 +55,18 @@ def test_replenish_resources(ray_start):
pass
ray.get(cpu_task.remote())
start = time.time()
resources_reset = False
timeout = 10
while not resources_reset and time.time() - start < timeout:
while not resources_reset:
available_resources = ray.global_state.available_resources()
resources_reset = (cluster_resources == available_resources)
assert resources_reset
@pytest.mark.skipif(
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(10)
def test_uses_resources(ray_start):
cluster_resources = ray.global_state.cluster_resources()
@@ -48,11 +77,32 @@ def test_uses_resources(ray_start):
cpu_task.remote()
resource_used = False
start = time.time()
timeout = 10
while not resource_used and time.time() - start < timeout:
while not resource_used:
available_resources = ray.global_state.available_resources()
resource_used = available_resources[
"CPU"] == cluster_resources["CPU"] - 1
assert resource_used
@pytest.mark.skipif(
pytest_timeout is None,
reason="Timeout package not installed; skipping test that may hang.")
@pytest.mark.timeout(20)
def test_add_remove_cluster_resources(cluster_start):
"""Tests that Global State API is consistent with actual cluster."""
cluster = cluster_start
assert ray.global_state.cluster_resources()["CPU"] == 1
nodes = []
nodes += [cluster.add_node(resources=dict(CPU=1))]
assert cluster.wait_for_nodes()
assert ray.global_state.cluster_resources()["CPU"] == 2
cluster.remove_node(nodes.pop())
assert cluster.wait_for_nodes()
assert ray.global_state.cluster_resources()["CPU"] == 1
for i in range(5):
nodes += [cluster.add_node(resources=dict(CPU=1))]
assert cluster.wait_for_nodes()
assert ray.global_state.cluster_resources()["CPU"] == 6