Moving tests from test/ to python/ray/tests/ (#3950)

This commit is contained in:
William Ma
2019-02-21 11:09:08 -08:00
committed by Robert Nishihara
parent acbe0b4e5f
commit c7a4c74f55
59 changed files with 119 additions and 103 deletions
+1 -1
View File
@@ -8,7 +8,7 @@ import argparse
import yaml
import ray
from ray.test.cluster_utils import Cluster
from ray.tests.cluster_utils import Cluster
from ray.tune.config_parser import make_parser
from ray.tune.trial import resources_to_json
from ray.tune.tune import _make_scheduler, run_experiments
File diff suppressed because it is too large Load Diff
+243
View File
@@ -0,0 +1,243 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import numpy as np
from numpy.testing import assert_equal, assert_almost_equal
import pytest
import sys
import ray
import ray.experimental.array.remote as ra
import ray.experimental.array.distributed as da
import ray.tests.cluster_utils
if sys.version_info >= (3, 0):
from importlib import reload
@pytest.fixture
def ray_start_regular():
for module in [
ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg
]:
reload(module)
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_remote_array_methods(ray_start_regular):
# test eye
object_id = ra.eye.remote(3)
val = ray.get(object_id)
assert_almost_equal(val, np.eye(3))
# test zeros
object_id = ra.zeros.remote([3, 4, 5])
val = ray.get(object_id)
assert_equal(val, np.zeros([3, 4, 5]))
# test qr - pass by value
a_val = np.random.normal(size=[10, 11])
q_id, r_id = ra.linalg.qr.remote(a_val)
q_val = ray.get(q_id)
r_val = ray.get(r_id)
assert_almost_equal(np.dot(q_val, r_val), a_val)
# test qr - pass by objectid
a = ra.random.normal.remote([10, 13])
q_id, r_id = ra.linalg.qr.remote(a)
a_val = ray.get(a)
q_val = ray.get(q_id)
r_val = ray.get(r_id)
assert_almost_equal(np.dot(q_val, r_val), a_val)
def test_distributed_array_assemble(ray_start_regular):
a = ra.ones.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
b = ra.zeros.remote([da.BLOCK_SIZE, da.BLOCK_SIZE])
x = da.DistArray([2 * da.BLOCK_SIZE, da.BLOCK_SIZE], np.array([[a], [b]]))
assert_equal(
x.assemble(),
np.vstack([
np.ones([da.BLOCK_SIZE, da.BLOCK_SIZE]),
np.zeros([da.BLOCK_SIZE, da.BLOCK_SIZE])
]))
@pytest.fixture
def ray_start_two_nodes():
for module in [
ra.core, ra.random, ra.linalg, da.core, da.random, da.linalg
]:
reload(module)
# Start the Ray processes.
cluster = ray.tests.cluster_utils.Cluster()
for _ in range(2):
cluster.add_node(num_cpus=10)
ray.init(redis_address=cluster.redis_address)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_distributed_array_methods(ray_start_two_nodes):
x = da.zeros.remote([9, 25, 51], "float")
assert_equal(ray.get(da.assemble.remote(x)), np.zeros([9, 25, 51]))
x = da.ones.remote([11, 25, 49], dtype_name="float")
assert_equal(ray.get(da.assemble.remote(x)), np.ones([11, 25, 49]))
x = da.random.normal.remote([11, 25, 49])
y = da.copy.remote(x)
assert_equal(
ray.get(da.assemble.remote(x)), ray.get(da.assemble.remote(y)))
x = da.eye.remote(25, dtype_name="float")
assert_equal(ray.get(da.assemble.remote(x)), np.eye(25))
x = da.random.normal.remote([25, 49])
y = da.triu.remote(x)
assert_equal(
ray.get(da.assemble.remote(y)), np.triu(
ray.get(da.assemble.remote(x))))
x = da.random.normal.remote([25, 49])
y = da.tril.remote(x)
assert_equal(
ray.get(da.assemble.remote(y)), np.tril(
ray.get(da.assemble.remote(x))))
x = da.random.normal.remote([25, 49])
y = da.random.normal.remote([49, 18])
z = da.dot.remote(x, y)
w = da.assemble.remote(z)
u = da.assemble.remote(x)
v = da.assemble.remote(y)
assert_almost_equal(ray.get(w), np.dot(ray.get(u), ray.get(v)))
assert_almost_equal(ray.get(w), np.dot(ray.get(u), ray.get(v)))
# test add
x = da.random.normal.remote([23, 42])
y = da.random.normal.remote([23, 42])
z = da.add.remote(x, y)
assert_almost_equal(
ray.get(da.assemble.remote(z)),
ray.get(da.assemble.remote(x)) + ray.get(da.assemble.remote(y)))
# test subtract
x = da.random.normal.remote([33, 40])
y = da.random.normal.remote([33, 40])
z = da.subtract.remote(x, y)
assert_almost_equal(
ray.get(da.assemble.remote(z)),
ray.get(da.assemble.remote(x)) - ray.get(da.assemble.remote(y)))
# test transpose
x = da.random.normal.remote([234, 432])
y = da.transpose.remote(x)
assert_equal(
ray.get(da.assemble.remote(x)).T, ray.get(da.assemble.remote(y)))
# test numpy_to_dist
x = da.random.normal.remote([23, 45])
y = da.assemble.remote(x)
z = da.numpy_to_dist.remote(y)
w = da.assemble.remote(z)
assert_equal(
ray.get(da.assemble.remote(x)), ray.get(da.assemble.remote(z)))
assert_equal(ray.get(y), ray.get(w))
# test da.tsqr
for shape in [[123, da.BLOCK_SIZE], [7, da.BLOCK_SIZE],
[da.BLOCK_SIZE, da.BLOCK_SIZE], [da.BLOCK_SIZE, 7],
[10 * da.BLOCK_SIZE, da.BLOCK_SIZE]]:
x = da.random.normal.remote(shape)
K = min(shape)
q, r = da.linalg.tsqr.remote(x)
x_val = ray.get(da.assemble.remote(x))
q_val = ray.get(da.assemble.remote(q))
r_val = ray.get(r)
assert r_val.shape == (K, shape[1])
assert_equal(r_val, np.triu(r_val))
assert_almost_equal(x_val, np.dot(q_val, r_val))
assert_almost_equal(np.dot(q_val.T, q_val), np.eye(K))
# test da.linalg.modified_lu
def test_modified_lu(d1, d2):
print("testing dist_modified_lu with d1 = " + str(d1) + ", d2 = " +
str(d2))
assert d1 >= d2
m = ra.random.normal.remote([d1, d2])
q, r = ra.linalg.qr.remote(m)
l, u, s = da.linalg.modified_lu.remote(da.numpy_to_dist.remote(q))
q_val = ray.get(q)
ray.get(r)
l_val = ray.get(da.assemble.remote(l))
u_val = ray.get(u)
s_val = ray.get(s)
s_mat = np.zeros((d1, d2))
for i in range(len(s_val)):
s_mat[i, i] = s_val[i]
# Check that q - s = l * u.
assert_almost_equal(q_val - s_mat, np.dot(l_val, u_val))
# Check that u is upper triangular.
assert_equal(np.triu(u_val), u_val)
# Check that l is lower triangular.
assert_equal(np.tril(l_val), l_val)
for d1, d2 in [(100, 100), (99, 98), (7, 5), (7, 7), (20, 7), (20, 10)]:
test_modified_lu(d1, d2)
# test dist_tsqr_hr
def test_dist_tsqr_hr(d1, d2):
print("testing dist_tsqr_hr with d1 = " + str(d1) + ", d2 = " +
str(d2))
a = da.random.normal.remote([d1, d2])
y, t, y_top, r = da.linalg.tsqr_hr.remote(a)
a_val = ray.get(da.assemble.remote(a))
y_val = ray.get(da.assemble.remote(y))
t_val = ray.get(t)
y_top_val = ray.get(y_top)
r_val = ray.get(r)
tall_eye = np.zeros((d1, min(d1, d2)))
np.fill_diagonal(tall_eye, 1)
q = tall_eye - np.dot(y_val, np.dot(t_val, y_top_val.T))
# Check that q.T * q = I.
assert_almost_equal(np.dot(q.T, q), np.eye(min(d1, d2)))
# Check that a = (I - y * t * y_top.T) * r.
assert_almost_equal(np.dot(q, r_val), a_val)
for d1, d2 in [(123, da.BLOCK_SIZE), (7, da.BLOCK_SIZE), (da.BLOCK_SIZE,
da.BLOCK_SIZE),
(da.BLOCK_SIZE, 7), (10 * da.BLOCK_SIZE, da.BLOCK_SIZE)]:
test_dist_tsqr_hr(d1, d2)
def test_dist_qr(d1, d2):
print("testing qr with d1 = {}, and d2 = {}.".format(d1, d2))
a = da.random.normal.remote([d1, d2])
K = min(d1, d2)
q, r = da.linalg.qr.remote(a)
a_val = ray.get(da.assemble.remote(a))
q_val = ray.get(da.assemble.remote(q))
r_val = ray.get(da.assemble.remote(r))
assert q_val.shape == (d1, K)
assert r_val.shape == (K, d2)
assert_almost_equal(np.dot(q_val.T, q_val), np.eye(K))
assert_equal(r_val, np.triu(r_val))
assert_almost_equal(a_val, np.dot(q_val, r_val))
for d1, d2 in [(123, da.BLOCK_SIZE), (7, da.BLOCK_SIZE), (da.BLOCK_SIZE,
da.BLOCK_SIZE),
(da.BLOCK_SIZE, 7), (13, 21), (34, 35), (8, 7)]:
test_dist_qr(d1, d2)
test_dist_qr(d2, d1)
for _ in range(20):
d1 = np.random.randint(1, 35)
d2 = np.random.randint(1, 35)
test_dist_qr(d1, d2)
+724
View File
@@ -0,0 +1,724 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from flaky import flaky
import shutil
import tempfile
import threading
import time
import unittest
import yaml
import copy
import ray
import ray.services as services
from ray.autoscaler.autoscaler import StandardAutoscaler, LoadMetrics, \
fillout_defaults, validate_config
from ray.autoscaler.tags import TAG_RAY_NODE_TYPE, TAG_RAY_NODE_STATUS
from ray.autoscaler.node_provider import NODE_PROVIDERS, NodeProvider
import pytest
class MockNode(object):
def __init__(self, node_id, tags):
self.node_id = node_id
self.state = "pending"
self.tags = tags
self.external_ip = "1.2.3.4"
self.internal_ip = "172.0.0.{}".format(self.node_id)
def matches(self, tags):
for k, v in tags.items():
if k not in self.tags or self.tags[k] != v:
return False
return True
class MockProcessRunner(object):
def __init__(self, fail_cmds=[]):
self.calls = []
self.fail_cmds = fail_cmds
def check_call(self, cmd, *args, **kwargs):
for token in self.fail_cmds:
if token in str(cmd):
raise Exception("Failing command on purpose")
self.calls.append(cmd)
class MockProvider(NodeProvider):
def __init__(self):
self.mock_nodes = {}
self.next_id = 0
self.throw = False
self.fail_creates = False
self.ready_to_create = threading.Event()
self.ready_to_create.set()
def nodes(self, tag_filters):
if self.throw:
raise Exception("oops")
return [
n.node_id for n in self.mock_nodes.values()
if n.matches(tag_filters) and n.state != "terminated"
]
def is_running(self, node_id):
return self.mock_nodes[node_id].state == "running"
def is_terminated(self, node_id):
return self.mock_nodes[node_id].state == "terminated"
def node_tags(self, node_id):
return self.mock_nodes[node_id].tags
def internal_ip(self, node_id):
return self.mock_nodes[node_id].internal_ip
def external_ip(self, node_id):
return self.mock_nodes[node_id].external_ip
def create_node(self, node_config, tags, count):
self.ready_to_create.wait()
if self.fail_creates:
return
for _ in range(count):
self.mock_nodes[self.next_id] = MockNode(self.next_id, tags)
self.next_id += 1
def set_node_tags(self, node_id, tags):
self.mock_nodes[node_id].tags.update(tags)
def terminate_node(self, node_id):
self.mock_nodes[node_id].state = "terminated"
SMALL_CLUSTER = {
"cluster_name": "default",
"min_workers": 2,
"max_workers": 2,
"initial_workers": 0,
"target_utilization_fraction": 0.8,
"idle_timeout_minutes": 5,
"provider": {
"type": "mock",
"region": "us-east-1",
"availability_zone": "us-east-1a",
},
"docker": {
"image": "example",
"container_name": "mock",
},
"auth": {
"ssh_user": "ubuntu",
"ssh_private_key": "/dev/null",
},
"head_node": {
"TestProp": 1,
},
"worker_nodes": {
"TestProp": 2,
},
"file_mounts": {},
"initialization_commands": ["cmd0"],
"setup_commands": ["cmd1"],
"head_setup_commands": ["cmd2"],
"worker_setup_commands": ["cmd3"],
"head_start_ray_commands": ["start_ray_head"],
"worker_start_ray_commands": ["start_ray_worker"],
}
class LoadMetricsTest(unittest.TestCase):
def testUpdate(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
assert lm.approx_workers_used() == 0.5
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
assert lm.approx_workers_used() == 1.0
lm.update("2.2.2.2", {"CPU": 2}, {"CPU": 0})
assert lm.approx_workers_used() == 2.0
def testPruneByNodeIp(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 1}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 1}, {"CPU": 0})
lm.prune_active_ips({"1.1.1.1", "4.4.4.4"})
assert lm.approx_workers_used() == 1.0
def testBottleneckResource(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
assert lm.approx_workers_used() == 1.88
def testHeartbeat(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 1})
lm.mark_active("2.2.2.2")
assert "1.1.1.1" in lm.last_heartbeat_time_by_ip
assert "2.2.2.2" in lm.last_heartbeat_time_by_ip
assert "3.3.3.3" not in lm.last_heartbeat_time_by_ip
def testDebugString(self):
lm = LoadMetrics()
lm.update("1.1.1.1", {"CPU": 2}, {"CPU": 0})
lm.update("2.2.2.2", {"CPU": 2, "GPU": 16}, {"CPU": 2, "GPU": 2})
debug = lm.info_string()
assert "ResourceUsage=2.0/4.0 CPU, 14.0/16.0 GPU" in debug
assert "NumNodesConnected=2" in debug
assert "NumNodesUsed=1.88" in debug
class AutoscalingTest(unittest.TestCase):
def setUp(self):
NODE_PROVIDERS["mock"] = \
lambda: (None, self.create_provider)
self.provider = None
self.tmpdir = tempfile.mkdtemp()
def tearDown(self):
del NODE_PROVIDERS["mock"]
shutil.rmtree(self.tmpdir)
ray.shutdown()
def waitFor(self, condition, num_retries=50):
for _ in range(num_retries):
if condition():
return
time.sleep(.1)
raise Exception("Timed out waiting for {}".format(condition))
def waitForNodes(self, expected, comparison=None, tag_filters={}):
MAX_ITER = 50
for i in range(MAX_ITER):
n = len(self.provider.nodes(tag_filters))
if comparison is None:
comparison = self.assertEqual
try:
comparison(n, expected)
return
except Exception:
if i == MAX_ITER - 1:
raise
time.sleep(.1)
def create_provider(self, config, cluster_name):
assert self.provider
return self.provider
def write_config(self, config):
path = self.tmpdir + "/simple.yaml"
with open(path, "w") as f:
f.write(yaml.dump(config))
return path
def testInvalidConfig(self):
invalid_config = "/dev/null"
with pytest.raises(ValueError):
StandardAutoscaler(
invalid_config, LoadMetrics(), update_interval_s=0)
def testValidation(self):
"""Ensures that schema validation is working."""
config = copy.deepcopy(SMALL_CLUSTER)
try:
validate_config(config)
except Exception:
self.fail("Test config did not pass validation test!")
config["blah"] = "blah"
with pytest.raises(ValueError):
validate_config(config)
del config["blah"]
config["provider"]["blah"] = "blah"
with pytest.raises(ValueError):
validate_config(config)
del config["provider"]["blah"]
del config["provider"]
with pytest.raises(ValueError):
validate_config(config)
def testValidateDefaultConfig(self):
config = {}
config["provider"] = {
"type": "aws",
"region": "us-east-1",
"availability_zone": "us-east-1a",
}
config = fillout_defaults(config)
try:
validate_config(config)
except Exception:
self.fail("Default config did not pass validation test!")
def testScaleUp(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
autoscaler.update()
self.waitForNodes(2)
autoscaler.update()
self.waitForNodes(2)
def testTerminateOutdatedNodesGracefully(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 5
config["max_workers"] = 5
config_path = self.write_config(config)
self.provider = MockProvider()
self.provider.create_node({}, {TAG_RAY_NODE_TYPE: "worker"}, 10)
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
self.waitForNodes(10)
# Gradually scales down to meet target size, never going too low
for _ in range(10):
autoscaler.update()
self.waitForNodes(5, comparison=self.assertLessEqual)
self.waitForNodes(4, comparison=self.assertGreaterEqual)
# Eventually reaches steady state
self.waitForNodes(5)
def testDynamicScaling(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
update_interval_s=0)
self.waitForNodes(0)
autoscaler.update()
self.waitForNodes(2)
# Update the config to reduce the cluster size
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
autoscaler.update()
self.waitForNodes(1)
# Update the config to reduce the cluster size
new_config["min_workers"] = 10
new_config["max_workers"] = 10
self.write_config(new_config)
autoscaler.update()
self.waitForNodes(6)
autoscaler.update()
self.waitForNodes(10)
def testInitialWorkers(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 20
config["initial_workers"] = 10
config_path = self.write_config(config)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
update_interval_s=0)
self.waitForNodes(0)
autoscaler.update()
self.waitForNodes(5) # expected due to batch sizes and concurrency
autoscaler.update()
self.waitForNodes(10)
autoscaler.update()
def testDelayedLaunch(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
update_interval_s=0)
assert len(self.provider.nodes({})) == 0
# Update will try to create, but will block until we set the flag
self.provider.ready_to_create.clear()
autoscaler.update()
assert autoscaler.num_launches_pending.value == 2
assert len(self.provider.nodes({})) == 0
# Set the flag, check it updates
self.provider.ready_to_create.set()
self.waitForNodes(2)
assert autoscaler.num_launches_pending.value == 0
# Update the config to reduce the cluster size
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
autoscaler.update()
assert len(self.provider.nodes({})) == 1
def testDelayedLaunchWithFailure(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 10
config["max_workers"] = 10
config_path = self.write_config(config)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=5,
max_concurrent_launches=8,
max_failures=0,
update_interval_s=0)
assert len(self.provider.nodes({})) == 0
# update() should launch a wave of 5 nodes (max_launch_batch)
# Force this first wave to block.
rtc1 = self.provider.ready_to_create
rtc1.clear()
autoscaler.update()
# Synchronization: wait for launchy thread to be blocked on rtc1
if hasattr(rtc1, '_cond'): # Python 3.5
waiters = rtc1._cond._waiters
else: # Python 2.7
waiters = rtc1._Event__cond._Condition__waiters
self.waitFor(lambda: len(waiters) == 1)
assert autoscaler.num_launches_pending.value == 5
assert len(self.provider.nodes({})) == 0
# Call update() to launch a second wave of 3 nodes,
# as 5 + 3 = 8 = max_concurrent_launches.
# Make this wave complete immediately.
rtc2 = threading.Event()
self.provider.ready_to_create = rtc2
rtc2.set()
autoscaler.update()
self.waitForNodes(3)
assert autoscaler.num_launches_pending.value == 5
# The first wave of 5 will now tragically fail
self.provider.fail_creates = True
rtc1.set()
self.waitFor(lambda: autoscaler.num_launches_pending.value == 0)
assert len(self.provider.nodes({})) == 3
# Retry the first wave, allowing it to succeed this time
self.provider.fail_creates = False
autoscaler.update()
self.waitForNodes(8)
assert autoscaler.num_launches_pending.value == 0
# Final wave of 2 nodes
autoscaler.update()
self.waitForNodes(10)
assert autoscaler.num_launches_pending.value == 0
def testUpdateThrottling(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=5,
max_concurrent_launches=5,
max_failures=0,
update_interval_s=10)
autoscaler.update()
self.waitForNodes(2)
assert autoscaler.num_launches_pending.value == 0
new_config = SMALL_CLUSTER.copy()
new_config["max_workers"] = 1
self.write_config(new_config)
autoscaler.update()
# not updated yet
# note that node termination happens in the main thread, so
# we do not need to add any delay here before checking
assert len(self.provider.nodes({})) == 2
assert autoscaler.num_launches_pending.value == 0
def testLaunchConfigChange(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
# Update the config to change the node type
new_config = SMALL_CLUSTER.copy()
new_config["worker_nodes"]["InstanceType"] = "updated"
self.write_config(new_config)
self.provider.ready_to_create.clear()
for _ in range(5):
autoscaler.update()
self.waitForNodes(0)
self.provider.ready_to_create.set()
self.waitForNodes(2)
def testIgnoresCorruptedConfig(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_launch_batch=10,
max_concurrent_launches=10,
max_failures=0,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
# Write a corrupted config
self.write_config("asdf")
for _ in range(10):
autoscaler.update()
time.sleep(0.1)
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 2
# New a good config again
new_config = SMALL_CLUSTER.copy()
new_config["min_workers"] = 10
new_config["max_workers"] = 10
self.write_config(new_config)
autoscaler.update()
self.waitForNodes(10)
def testMaxFailures(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
self.provider.throw = True
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=2, update_interval_s=0)
autoscaler.update()
autoscaler.update()
with pytest.raises(Exception):
autoscaler.update()
def testLaunchNewNodeOnOutOfBandTerminate(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
autoscaler.update()
autoscaler.update()
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "terminated"
assert len(self.provider.nodes({})) == 0
autoscaler.update()
self.waitForNodes(2)
def testConfiguresNewNodes(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
autoscaler.update()
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "running"
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: "up-to-date"})
def testReportsConfigFailures(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner(fail_cmds=["cmd1"])
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
autoscaler.update()
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "running"
autoscaler.update()
self.waitForNodes(
2, tag_filters={TAG_RAY_NODE_STATUS: "update-failed"})
def testConfiguresOutdatedNodes(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
autoscaler = StandardAutoscaler(
config_path,
LoadMetrics(),
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
autoscaler.update()
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "running"
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: "up-to-date"})
runner.calls = []
new_config = SMALL_CLUSTER.copy()
new_config["worker_setup_commands"] = ["cmdX", "cmdY"]
self.write_config(new_config)
autoscaler.update()
autoscaler.update()
self.waitFor(lambda: len(runner.calls) > 0)
def testScaleUpBasedOnLoad(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 1
config["max_workers"] = 10
config["target_utilization_fraction"] = 0.5
config_path = self.write_config(config)
self.provider = MockProvider()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, lm, max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
autoscaler.update()
self.waitForNodes(1)
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 1
# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 0}) # worker 1
autoscaler.update()
self.waitForNodes(3)
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 0})
autoscaler.update()
self.waitForNodes(5)
# Holds steady when load is removed
lm.update("172.0.0.0", {"CPU": 2}, {"CPU": 2})
lm.update("172.0.0.1", {"CPU": 2}, {"CPU": 2})
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 5
# Scales down as nodes become unused
lm.last_used_time_by_ip["172.0.0.0"] = 0
lm.last_used_time_by_ip["172.0.0.1"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 3
lm.last_used_time_by_ip["172.0.0.2"] = 0
lm.last_used_time_by_ip["172.0.0.3"] = 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 1
def testDontScaleBelowTarget(self):
config = SMALL_CLUSTER.copy()
config["min_workers"] = 0
config["max_workers"] = 2
config["target_utilization_fraction"] = 0.5
config_path = self.write_config(config)
self.provider = MockProvider()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path, lm, max_failures=0, update_interval_s=0)
assert len(self.provider.nodes({})) == 0
autoscaler.update()
assert autoscaler.num_launches_pending.value == 0
assert len(self.provider.nodes({})) == 0
# Scales up as nodes are reported as used
local_ip = services.get_node_ip_address()
lm.update(local_ip, {"CPU": 2}, {"CPU": 0}) # head
# 1.0 nodes used => target nodes = 2 => target workers = 1
autoscaler.update()
self.waitForNodes(1)
# Make new node idle, and never used.
# Should hold steady as target is still 2.
lm.update("172.0.0.0", {"CPU": 0}, {"CPU": 0})
lm.last_used_time_by_ip["172.0.0.0"] = 0
autoscaler.update()
assert len(self.provider.nodes({})) == 1
# Reduce load on head => target nodes = 1 => target workers = 0
lm.update(local_ip, {"CPU": 2}, {"CPU": 1})
autoscaler.update()
assert len(self.provider.nodes({})) == 0
@flaky(max_runs=4)
def testRecoverUnhealthyWorkers(self):
config_path = self.write_config(SMALL_CLUSTER)
self.provider = MockProvider()
runner = MockProcessRunner()
lm = LoadMetrics()
autoscaler = StandardAutoscaler(
config_path,
lm,
max_failures=0,
process_runner=runner,
update_interval_s=0)
autoscaler.update()
self.waitForNodes(2)
for node in self.provider.mock_nodes.values():
node.state = "running"
autoscaler.update()
self.waitForNodes(2, tag_filters={TAG_RAY_NODE_STATUS: "up-to-date"})
# Mark a node as unhealthy
lm.last_heartbeat_time_by_ip["172.0.0.0"] = 0
num_calls = len(runner.calls)
autoscaler.update()
self.waitFor(lambda: len(runner.calls) > num_calls, num_retries=150)
def testExternalNodeScaler(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "ray.autoscaler.node_provider.NodeProvider",
}
config_path = self.write_config(config)
autoscaler = StandardAutoscaler(
config_path, LoadMetrics(), max_failures=0, update_interval_s=0)
assert isinstance(autoscaler.provider, NodeProvider)
def testExternalNodeScalerWrongImport(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "mymodule.provider_class",
}
invalid_provider = self.write_config(config)
with pytest.raises(ImportError):
StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0)
def testExternalNodeScalerWrongModuleFormat(self):
config = SMALL_CLUSTER.copy()
config["provider"] = {
"type": "external",
"module": "does-not-exist",
}
invalid_provider = self.write_config(config)
with pytest.raises(ValueError):
StandardAutoscaler(
invalid_provider, LoadMetrics(), update_interval_s=0)
if __name__ == "__main__":
unittest.main(verbosity=2)
File diff suppressed because it is too large Load Diff
+472
View File
@@ -0,0 +1,472 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import json
import signal
import sys
import time
import numpy as np
import pytest
import ray
import ray.ray_constants as ray_constants
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_string_as_driver_nonblocking
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def ray_start_cluster():
node_args = {
"num_cpus": 4,
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10
})
}
# Start with 3 worker nodes and 4 cores each.
cluster = Cluster(
initialize_head=True, connect=True, head_node_args=node_args)
workers = []
for _ in range(3):
workers.append(cluster.add_node(**node_args))
cluster.wait_for_nodes()
yield cluster
ray.shutdown()
cluster.shutdown()
# This test checks that when a worker dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_get(shutdown_only):
# Start the Ray processes.
ray.init(num_cpus=2)
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ray.remote
def get_worker_pid():
return os.getpid()
x_id = sleep_forever.remote()
time.sleep(0.01) # Try to wait for the sleep task to get scheduled.
# Get the PID of the other worker.
worker_pid = ray.get(get_worker_pid.remote())
@ray.remote
def f(id_in_a_list):
ray.get(id_in_a_list[0])
# Have the worker wait in a get call.
result_id = f.remote([x_id])
time.sleep(1)
# Make sure the task hasn't finished.
ready_ids, _ = ray.wait([result_id], timeout=0)
assert len(ready_ids) == 0
# Kill the worker.
os.kill(worker_pid, signal.SIGKILL)
time.sleep(0.1)
# Make sure the sleep task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
# This test checks that when a driver dies in the middle of a get, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_get(shutdown_only):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
@ray.remote
def sleep_forever():
time.sleep(10**6)
x_id = sleep_forever.remote()
driver = """
import ray
ray.init("{}")
ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
""".format(address_info["redis_address"], x_id.hex())
p = run_string_as_driver_nonblocking(driver)
# Make sure the driver is running.
time.sleep(1)
assert p.poll() is None
# Kill the driver process.
p.kill()
p.wait()
time.sleep(0.1)
# Make sure the original task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
# This test checks that when a worker dies in the middle of a wait, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_worker_wait(shutdown_only):
ray.init(num_cpus=2)
@ray.remote
def sleep_forever():
time.sleep(10**6)
@ray.remote
def get_pid():
return os.getpid()
x_id = sleep_forever.remote()
# Get the PID of the worker that block_in_wait will run on (sleep a little
# to make sure that sleep_forever has already started).
time.sleep(0.1)
worker_pid = ray.get(get_pid.remote())
@ray.remote
def block_in_wait(object_id_in_list):
ray.wait(object_id_in_list)
# Have the worker wait in a wait call.
block_in_wait.remote([x_id])
time.sleep(0.1)
# Kill the worker.
os.kill(worker_pid, signal.SIGKILL)
time.sleep(0.1)
# Create the object.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
# This test checks that when a driver dies in the middle of a wait, the plasma
# store and raylet will not die.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Not working with new GCS API.")
def test_dying_driver_wait(shutdown_only):
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
@ray.remote
def sleep_forever():
time.sleep(10**6)
x_id = sleep_forever.remote()
driver = """
import ray
ray.init("{}")
ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))])
""".format(address_info["redis_address"], x_id.hex())
p = run_string_as_driver_nonblocking(driver)
# Make sure the driver is running.
time.sleep(1)
assert p.poll() is None
# Kill the driver process.
p.kill()
p.wait()
time.sleep(0.1)
# Make sure the original task hasn't finished.
ready_ids, _ = ray.wait([x_id], timeout=0)
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# wait can return.
ray.worker.global_worker.put_object(x_id, 1)
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_nodes = request.param[0]
num_initial_workers = request.param[1]
# Start the Ray processes.
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_initial_workers)
ray.init(redis_address=cluster.redis_address)
yield num_nodes, num_initial_workers
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_worker_failed(ray_start_workers_separate_multinode):
num_nodes, num_initial_workers = (ray_start_workers_separate_multinode)
@ray.remote
def get_pids():
time.sleep(0.25)
return os.getpid()
start_time = time.time()
pids = set()
while len(pids) < num_nodes * num_initial_workers:
new_pids = ray.get([
get_pids.remote()
for _ in range(2 * num_nodes * num_initial_workers)
])
for pid in new_pids:
pids.add(pid)
if time.time() - start_time > 60:
raise Exception("Timed out while waiting to get worker PIDs.")
@ray.remote
def f(x):
time.sleep(0.5)
return x
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_ids += [f.remote(object_id) for object_id in object_ids]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
for pid in pids:
os.kill(pid, signal.SIGKILL)
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
for object_id in object_ids:
try:
ray.get(object_id)
except (ray.exceptions.RayTaskError, ray.exceptions.RayWorkerError):
pass
@pytest.fixture
def ray_initialize_cluster():
# Start with 4 workers and 4 cores.
num_nodes = 4
num_workers_per_scheduler = 8
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(
num_cpus=num_workers_per_scheduler,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 1000,
"num_heartbeats_timeout": 10,
}))
ray.init(redis_address=cluster.redis_address)
yield cluster
ray.shutdown()
cluster.shutdown()
def _test_component_failed(cluster, component_type):
"""Kill a component on all worker nodes and check workload succeeds."""
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
return x
@ray.remote
def g(*xs):
return 1
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
x = 1
for _ in range(1000):
x = f.remote(x)
xs = [g.remote(1)]
for _ in range(100):
xs.append(g.remote(*xs))
xs.append(g.remote(1))
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
ray.get(x)
ray.get(xs)
def check_components_alive(cluster, component_type, check_component_alive):
"""Check that a given component type is alive on all worker nodes."""
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
if check_component_alive:
assert process.poll() is None
else:
print("waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
process.wait()
print("done waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
assert not process.poll() is None
def test_raylet_failed(ray_initialize_cluster):
cluster = ray_initialize_cluster
# Kill all local schedulers on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET)
# The plasma stores should still be alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
True)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_plasma_store_failed(ray_initialize_cluster):
cluster = ray_initialize_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
False)
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
# Children actors will die about half the time.
death_probability = 0.5
children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(2):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait a while for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(shutdown_only):
ray.init(num_cpus=1)
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_raylet_monitor()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(shutdown_only):
ray.init(num_cpus=1)
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR])
assert len(process_infos) == 5
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
+42
View File
@@ -0,0 +1,42 @@
from __future__ import absolute_import, division, print_function
import os
import unittest
import redis
import ray
def parse_client(addr_port_str):
redis_address, redis_port = addr_port_str.split(":")
return redis.StrictRedis(host=redis_address, port=redis_port)
@unittest.skipIf(not os.environ.get("RAY_USE_NEW_GCS", False),
"Tests functionality of the new GCS.")
class CredisTest(unittest.TestCase):
def setUp(self):
self.config = ray.init(num_cpus=0)
def tearDown(self):
ray.shutdown()
def test_credis_started(self):
assert "redis_address" in self.config
primary = parse_client(self.config['redis_address'])
assert primary.ping() is True
member = primary.lrange('RedisShards', 0, -1)[0]
shard = parse_client(member.decode())
# Check that primary has loaded credis' master module.
chain = primary.execute_command('MASTER.GET_CHAIN')
assert len(chain) == 1
# Check that the shard has loaded credis' member module.
assert chain[0] == member
assert shard.execute_command('MEMBER.SN') == -1
if __name__ == "__main__":
unittest.main(verbosity=2)
+57
View File
@@ -0,0 +1,57 @@
from __future__ import absolute_import
from __future__ import print_function
import math
import numpy as np
import unittest
import ray
import cython_examples as cyth
def get_ray_result(cython_func, *args):
func = ray.remote(cython_func)
return ray.get(func.remote(*args))
class CythonTest(unittest.TestCase):
def setUp(self):
ray.init()
def tearDown(self):
ray.shutdown()
def assertEqualHelper(self, cython_func, expected, *args):
assert get_ray_result(cython_func, *args) == expected
def test_simple_func(self):
self.assertEqualHelper(cyth.simple_func, 6, 1, 2, 3)
self.assertEqualHelper(cyth.fib, 55, 10)
self.assertEqualHelper(cyth.fib_int, 55, 10)
self.assertEqualHelper(cyth.fib_cpdef, 55, 10)
self.assertEqualHelper(cyth.fib_cdef, 55, 10)
def test_simple_class(self):
cls = ray.remote(cyth.simple_class)
a1 = cls.remote()
a2 = cls.remote()
result1 = ray.get(a1.increment.remote())
result2 = ray.get(a2.increment.remote())
result3 = ray.get(a2.increment.remote())
assert result1 == 1
assert result2 == 1
assert result3 == 2
def test_numpy(self):
array = np.array([-1.0, 0.0, 1.0, 2.0])
answer = [float("-inf") if x <= 0 else math.log(x) for x in array]
result = get_ray_result(cyth.masked_log, array)
np.testing.assert_array_equal(answer, result)
if __name__ == "__main__":
unittest.main(verbosity=2)
+50
View File
@@ -0,0 +1,50 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import subprocess
import sys
import pytest
import ray
@pytest.fixture
def ray_gdb_start():
# Setup environment and start ray
_environ = os.environ.copy()
for process_name in ["RAYLET", "PLASMA_STORE"]:
os.environ["RAY_{}_GDB".format(process_name)] = "1"
os.environ["RAY_{}_TMUX".format(process_name)] = "1"
yield None
# Restore original environment and stop ray
os.environ.clear()
os.environ.update(_environ)
ray.shutdown()
@pytest.mark.skipif(
sys.platform != "linux" and sys.platform != "linux2",
reason="This test requires Linux.")
def test_raylet_gdb(ray_gdb_start):
# ray_gdb_start yields the expected process name
ray.init(num_cpus=1)
@ray.remote
def f():
return 42
assert ray.get(f.remote()) == 42
# Check process name in `ps aux | grep gdb`
for process_name in ["raylet/raylet", "plasma/plasma_store_server"]:
pgrep_command = subprocess.Popen(
["pgrep", "-f", "gdb.*{}".format(process_name)],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
assert pgrep_command.communicate()[0]
subprocess.call(["pkill", "-f", "gdb.*{}".format(process_name)])
+724
View File
@@ -0,0 +1,724 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import pytest
import sys
import tempfile
import threading
import time
import numpy as np
import redis
import ray
import ray.ray_constants as ray_constants
from ray.utils import _random_string
from ray.tests.cluster_utils import Cluster
def relevant_errors(error_type):
return [info for info in ray.error_info() if info["type"] == error_type]
def wait_for_errors(error_type, num_errors, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
if len(relevant_errors(error_type)) >= num_errors:
return
time.sleep(0.1)
raise Exception("Timing out of wait.")
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture
def shutdown_only():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_failed_task(ray_start_regular):
@ray.remote
def throw_exception_fct1():
raise Exception("Test function 1 intentionally failed.")
@ray.remote
def throw_exception_fct2():
raise Exception("Test function 2 intentionally failed.")
@ray.remote(num_return_vals=3)
def throw_exception_fct3(x):
raise Exception("Test function 3 intentionally failed.")
throw_exception_fct1.remote()
throw_exception_fct1.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
assert len(relevant_errors(ray_constants.TASK_PUSH_ERROR)) == 2
for task in relevant_errors(ray_constants.TASK_PUSH_ERROR):
msg = task.get("message")
assert "Test function 1 intentionally failed." in msg
x = throw_exception_fct2.remote()
try:
ray.get(x)
except Exception as e:
assert "Test function 2 intentionally failed." in str(e)
else:
# ray.get should throw an exception.
assert False
x, y, z = throw_exception_fct3.remote(1.0)
for ref in [x, y, z]:
try:
ray.get(ref)
except Exception as e:
assert "Test function 3 intentionally failed." in str(e)
else:
# ray.get should throw an exception.
assert False
@ray.remote
def f():
raise Exception("This function failed.")
try:
ray.get(f.remote())
except Exception as e:
assert "This function failed." in str(e)
else:
# ray.get should throw an exception.
assert False
def test_fail_importing_remote_function(ray_start_regular):
# Create the contents of a temporary Python file.
temporary_python_file = """
def temporary_helper_function():
return 1
"""
f = tempfile.NamedTemporaryFile(suffix=".py")
f.write(temporary_python_file.encode("ascii"))
f.flush()
directory = os.path.dirname(f.name)
# Get the module name and strip ".py" from the end.
module_name = os.path.basename(f.name)[:-3]
sys.path.append(directory)
module = __import__(module_name)
# Define a function that closes over this temporary module. This should
# fail when it is unpickled.
@ray.remote
def g():
return module.temporary_python_file()
wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
assert len(errors) == 2
assert "No module named" in errors[0]["message"]
assert "No module named" in errors[1]["message"]
# Check that if we try to call the function it throws an exception and
# does not hang.
for _ in range(10):
with pytest.raises(Exception):
ray.get(g.remote())
f.close()
# Clean up the junk we added to sys.path.
sys.path.pop(-1)
def test_failed_function_to_run(ray_start_regular):
def f(worker):
if ray.worker.global_worker.mode == ray.WORKER_MODE:
raise Exception("Function to run failed.")
ray.worker.global_worker.run_function_on_all_workers(f)
wait_for_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR, 2)
# Check that the error message is in the task info.
errors = relevant_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR)
assert len(errors) == 2
assert "Function to run failed." in errors[0]["message"]
assert "Function to run failed." in errors[1]["message"]
def test_fail_importing_actor(ray_start_regular):
# Create the contents of a temporary Python file.
temporary_python_file = """
def temporary_helper_function():
return 1
"""
f = tempfile.NamedTemporaryFile(suffix=".py")
f.write(temporary_python_file.encode("ascii"))
f.flush()
directory = os.path.dirname(f.name)
# Get the module name and strip ".py" from the end.
module_name = os.path.basename(f.name)[:-3]
sys.path.append(directory)
module = __import__(module_name)
# Define an actor that closes over this temporary module. This should
# fail when it is unpickled.
@ray.remote
class Foo(object):
def __init__(self):
self.x = module.temporary_python_file()
def get_val(self):
return 1
# There should be no errors yet.
assert len(ray.error_info()) == 0
# Create an actor.
foo = Foo.remote()
# Wait for the error to arrive.
wait_for_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR)
assert "No module named" in errors[0]["message"]
# Wait for the error from when the __init__ tries to run.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert ("failed to be imported, and so cannot execute this method" in
errors[0]["message"])
# Check that if we try to get the function it throws an exception and
# does not hang.
with pytest.raises(Exception):
ray.get(foo.get_val.remote())
# Wait for the error from when the call to get_val.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert ("failed to be imported, and so cannot execute this method" in
errors[1]["message"])
f.close()
# Clean up the junk we added to sys.path.
sys.path.pop(-1)
def test_failed_actor_init(ray_start_regular):
error_message1 = "actor constructor failed"
error_message2 = "actor method failed"
@ray.remote
class FailedActor(object):
def __init__(self):
raise Exception(error_message1)
def fail_method(self):
raise Exception(error_message2)
a = FailedActor.remote()
# Make sure that we get errors from a failed constructor.
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message1 in errors[0]["message"]
# Make sure that we get errors from a failed method.
a.fail_method.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 2
assert error_message1 in errors[1]["message"]
def test_failed_actor_method(ray_start_regular):
error_message2 = "actor method failed"
@ray.remote
class FailedActor(object):
def __init__(self):
pass
def fail_method(self):
raise Exception(error_message2)
a = FailedActor.remote()
# Make sure that we get errors from a failed method.
a.fail_method.remote()
wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert error_message2 in errors[0]["message"]
def test_incorrect_method_calls(ray_start_regular):
@ray.remote
class Actor(object):
def __init__(self, missing_variable_name):
pass
def get_val(self, x):
pass
# Make sure that we get errors if we call the constructor incorrectly.
# Create an actor with too few arguments.
with pytest.raises(Exception):
a = Actor.remote()
# Create an actor with too many arguments.
with pytest.raises(Exception):
a = Actor.remote(1, 2)
# Create an actor the correct number of arguments.
a = Actor.remote(1)
# Call a method with too few arguments.
with pytest.raises(Exception):
a.get_val.remote()
# Call a method with too many arguments.
with pytest.raises(Exception):
a.get_val.remote(1, 2)
# Call a method that doesn't exist.
with pytest.raises(AttributeError):
a.nonexistent_method()
with pytest.raises(AttributeError):
a.nonexistent_method.remote()
def test_worker_raising_exception(ray_start_regular):
@ray.remote
def f():
ray.worker.global_worker._get_next_task_from_local_scheduler = None
# Running this task should cause the worker to raise an exception after
# the task has successfully completed.
f.remote()
wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1)
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
def test_worker_dying(ray_start_regular):
# Define a remote function that will kill the worker that runs it.
@ray.remote
def f():
eval("exit()")
with pytest.raises(ray.exceptions.RayWorkerError):
ray.get(f.remote())
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
errors = relevant_errors(ray_constants.WORKER_DIED_PUSH_ERROR)
assert len(errors) == 1
assert "died or was killed while executing" in errors[0]["message"]
def test_actor_worker_dying(ray_start_regular):
@ray.remote
class Actor(object):
def kill(self):
eval("exit()")
@ray.remote
def consume(x):
pass
a = Actor.remote()
[obj], _ = ray.wait([a.kill.remote()], timeout=5.0)
with pytest.raises(ray.exceptions.RayActorError):
ray.get(obj)
with pytest.raises(ray.exceptions.RayTaskError):
ray.get(consume.remote(obj))
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
def test_actor_worker_dying_future_tasks(ray_start_regular):
@ray.remote
class Actor(object):
def getpid(self):
return os.getpid()
def sleep(self):
time.sleep(1)
a = Actor.remote()
pid = ray.get(a.getpid.remote())
tasks1 = [a.sleep.remote() for _ in range(10)]
os.kill(pid, 9)
time.sleep(0.1)
tasks2 = [a.sleep.remote() for _ in range(10)]
for obj in tasks1 + tasks2:
with pytest.raises(Exception):
ray.get(obj)
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
@ray.remote
class Actor(object):
def getpid(self):
return os.getpid()
a = Actor.remote()
pid = ray.get(a.getpid.remote())
os.kill(pid, 9)
time.sleep(0.1)
task2 = a.getpid.remote()
with pytest.raises(Exception):
ray.get(task2)
def test_actor_scope_or_intentionally_killed_message(ray_start_regular):
@ray.remote
class Actor(object):
pass
a = Actor.remote()
a = Actor.remote()
a.__ray_terminate__.remote()
time.sleep(1)
assert len(ray.error_info()) == 0, (
"Should not have propogated an error - {}".format(ray.error_info()))
@pytest.fixture
def ray_start_object_store_memory():
# Start the Ray processes.
store_size = 10**6
ray.init(num_cpus=1, object_store_memory=store_size)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.mark.skip("This test does not work yet.")
def test_put_error1(ray_start_object_store_memory):
num_objects = 3
object_size = 4 * 10**5
# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg
@ray.remote
def put_arg_task():
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = single_dependency.remote(0, np.zeros(
object_size, dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
# Get the last value to force all tasks to finish.
value = ray.get(args[-1])
assert value[0] == i
# Get the first value (which should have been evicted) to force
# reconstruction. Currently, since we're not able to reconstruct
# `ray.put` objects that were evicted and whose originating tasks
# are still running, this for-loop should hang and push an error to
# the driver.
ray.get(args[0])
put_arg_task.remote()
# Make sure we receive the correct error message.
wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
@pytest.mark.skip("This test does not work yet.")
def test_put_error2(ray_start_object_store_memory):
# This is the same as the previous test, but it calls ray.put directly.
num_objects = 3
object_size = 4 * 10**5
# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg
@ray.remote
def put_task():
# Launch num_objects instances of the remote task, each dependent
# on the one before it. The result of the first task should get
# evicted.
args = []
arg = ray.put(np.zeros(object_size, dtype=np.uint8))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
# Get the last value to force all tasks to finish.
value = ray.get(args[-1])
assert value[0] == i
# Get the first value (which should have been evicted) to force
# reconstruction. Currently, since we're not able to reconstruct
# `ray.put` objects that were evicted and whose originating tasks
# are still running, this for-loop should hang and push an error to
# the driver.
ray.get(args[0])
put_task.remote()
# Make sure we receive the correct error message.
wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
def test_version_mismatch(shutdown_only):
ray_version = ray.__version__
ray.__version__ = "fake ray version"
ray.init(num_cpus=1)
wait_for_errors(ray_constants.VERSION_MISMATCH_PUSH_ERROR, 1)
# Reset the version.
ray.__version__ = ray_version
def test_warning_monitor_died(shutdown_only):
ray.init(num_cpus=0)
time.sleep(1) # Make sure the monitor has started.
# Cause the monitor to raise an exception by pushing a malformed message to
# Redis. This will probably kill the raylets and the raylet_monitor in
# addition to the monitor.
fake_id = 20 * b"\x00"
malformed_message = "asdf"
redis_client = ray.worker.global_worker.redis_client
redis_client.execute_command(
"RAY.TABLE_ADD", ray.gcs_utils.TablePrefix.HEARTBEAT_BATCH,
ray.gcs_utils.TablePubsub.HEARTBEAT_BATCH, fake_id, malformed_message)
wait_for_errors(ray_constants.MONITOR_DIED_ERROR, 1)
def test_export_large_objects(ray_start_regular):
import ray.ray_constants as ray_constants
large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE)
@ray.remote
def f():
large_object
# Make sure that a warning is generated.
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 1)
@ray.remote
class Foo(object):
def __init__(self):
large_object
Foo.remote()
# Make sure that a warning is generated.
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)
def test_warning_for_infeasible_tasks(ray_start_regular):
# Check that we get warning messages for infeasible tasks.
@ray.remote(num_gpus=1)
def f():
pass
@ray.remote(resources={"Custom": 1})
class Foo(object):
pass
# This task is infeasible.
f.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1)
# This actor placement task is infeasible.
Foo.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 2)
def test_warning_for_infeasible_zero_cpu_actor(shutdown_only):
# Check that we cannot place an actor on a 0 CPU machine and that we get an
# infeasibility warning (even though the actor creation task itself
# requires no CPUs).
ray.init(num_cpus=0)
@ray.remote
class Foo(object):
pass
# The actor creation should be infeasible.
Foo.remote()
wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1)
def test_warning_for_too_many_actors(shutdown_only):
# Check that if we run a workload which requires too many workers to be
# started that we will receive a warning.
num_cpus = 2
ray.init(num_cpus=num_cpus)
@ray.remote
class Foo(object):
def __init__(self):
time.sleep(1000)
[Foo.remote() for _ in range(num_cpus * 3)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
[Foo.remote() for _ in range(num_cpus)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 2)
def test_warning_for_too_many_nested_tasks(shutdown_only):
# Check that if we run a workload which requires too many workers to be
# started that we will receive a warning.
num_cpus = 2
ray.init(num_cpus=num_cpus)
@ray.remote
def f():
time.sleep(1000)
return 1
@ray.remote
def g():
# Sleep so that the f tasks all get submitted to the scheduler after
# the g tasks.
time.sleep(1)
ray.get(f.remote())
[g.remote() for _ in range(num_cpus * 4)]
wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1)
def test_redis_module_failure(shutdown_only):
address_info = ray.init(num_cpus=1)
redis_address = address_info["redis_address"]
redis_address = redis_address.split(":")
assert len(redis_address) == 2
def run_failure_test(expecting_message, *command):
with pytest.raises(
Exception, match=".*{}.*".format(expecting_message)):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client.execute_command(*command)
def run_one_command(*command):
client = redis.StrictRedis(
host=redis_address[0], port=int(redis_address[1]))
client.execute_command(*command)
run_failure_test("wrong number of arguments", "RAY.TABLE_ADD", 13)
run_failure_test("Prefix must be in the TablePrefix range",
"RAY.TABLE_ADD", 100000, 1, 1, 1)
run_failure_test("Prefix must be in the TablePrefix range",
"RAY.TABLE_REQUEST_NOTIFICATIONS", 100000, 1, 1, 1)
run_failure_test("Prefix must be a valid TablePrefix integer",
"RAY.TABLE_ADD", b"a", 1, 1, 1)
run_failure_test("Pubsub channel must be in the TablePubsub range",
"RAY.TABLE_ADD", 1, 10000, 1, 1)
run_failure_test("Pubsub channel must be a valid integer", "RAY.TABLE_ADD",
1, b"a", 1, 1)
# Change the key from 1 to 2, since the previous command should have
# succeeded at writing the key, but not publishing it.
run_failure_test("Index is less than 0.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
-1)
run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
b"a")
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
# It's okay to add duplicate entries.
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 0)
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 1)
@pytest.fixture
def ray_start_two_nodes():
# Start the Ray processes.
cluster = Cluster()
for _ in range(2):
cluster.add_node(
num_cpus=0,
_internal_config=json.dumps({
"num_heartbeats_timeout": 40
}))
ray.init(redis_address=cluster.redis_address)
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# Note that this test will take at least 10 seconds because it must wait for
# the monitor to detect enough missed heartbeats.
def test_warning_for_dead_node(ray_start_two_nodes):
cluster = ray_start_two_nodes
cluster.wait_for_nodes()
client_ids = {item["ClientID"] for item in ray.global_state.client_table()}
# Try to make sure that the monitor has received at least one heartbeat
# from the node.
time.sleep(0.5)
# Kill both raylets.
cluster.list_all_nodes()[1].kill_raylet()
cluster.list_all_nodes()[0].kill_raylet()
# Check that we get warning messages for both raylets.
wait_for_errors(ray_constants.REMOVED_NODE_ERROR, 2, timeout=40)
# Extract the client IDs from the error messages. This will need to be
# changed if the error message changes.
warning_client_ids = {
item["message"].split(" ")[5]
for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR)
}
assert client_ids == warning_client_ids
def test_raylet_crash_when_get(ray_start_regular):
nonexistent_id = ray.ObjectID(_random_string())
def sleep_to_kill_raylet():
# Don't kill raylet before default workers get connected.
time.sleep(2)
ray.worker._global_node.kill_raylet()
thread = threading.Thread(target=sleep_to_kill_raylet)
thread.start()
with pytest.raises(Exception, match=r".*Connection closed unexpectedly.*"):
ray.get(nonexistent_id)
thread.join()
@@ -11,7 +11,7 @@ except ImportError:
import time
import ray
from ray.test.cluster_utils import Cluster
from ray.tests.cluster_utils import Cluster
@pytest.fixture
+119
View File
@@ -0,0 +1,119 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import os
import ray
import time
import numpy as np
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=3)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_timing(ray_start_regular):
@ray.remote
def empty_function():
pass
@ray.remote
def trivial_function():
return 1
# Measure the time required to submit a remote task to the scheduler.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
empty_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit an empty function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00038.
# Measure the time required to submit a remote task to the scheduler
# (where the remote task returns one value).
elapsed_times = []
for _ in range(1000):
start_time = time.time()
trivial_function.remote()
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.001.
# Measure the time required to submit a remote task to the scheduler
# and get the result.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
x = trivial_function.remote()
ray.get(x)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call and get the "
"result:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.0013.
# Measure the time required to do do a put.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
ray.put(1)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to put an int:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00087.
def test_cache(ray_start_regular):
A = np.random.rand(1, 1000000)
v = np.random.rand(1000000)
A_id = ray.put(A)
v_id = ray.put(v)
a = time.time()
for i in range(100):
A.dot(v)
b = time.time() - a
c = time.time()
for i in range(100):
ray.get(A_id).dot(ray.get(v_id))
d = time.time() - c
if d > 1.5 * b:
if os.getenv("TRAVIS") is None:
raise Exception("The caching test was too slow. "
"d = {}, b = {}".format(d, b))
else:
print("WARNING: The caching test was too slow. "
"d = {}, b = {}".format(d, b))
+75
View File
@@ -0,0 +1,75 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
test_values = [1, 1.0, "test", b"test", (0, 1), [0, 1], {0: 1}]
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=1)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_basic_task_api(ray_start):
# Test a simple function.
@ray.remote
def f_simple():
return 1
assert ray.get(f_simple.remote()) == 1
# Test multiple return values.
@ray.remote(num_return_vals=3)
def f_multiple_returns():
return 1, 2, 3
x_id1, x_id2, x_id3 = f_multiple_returns.remote()
assert ray.get([x_id1, x_id2, x_id3]) == [1, 2, 3]
# Test arguments passed by value.
@ray.remote
def f_args_by_value(x):
return x
for arg in test_values:
assert ray.get(f_args_by_value.remote(arg)) == arg
# Test arguments passed by ID.
# Test keyword arguments.
def test_put_api(ray_start):
for obj in test_values:
assert ray.get(ray.put(obj)) == obj
# Test putting object IDs.
x_id = ray.put(0)
for obj in [[x_id], (x_id, ), {x_id: x_id}]:
assert ray.get(ray.put(obj)) == obj
def test_actor_api(ray_start):
@ray.remote
class Foo(object):
def __init__(self, val):
self.x = val
def get(self):
return self.x
x = 1
f = Foo.remote(x)
assert (ray.get(f.get.remote()) == x)
+116
View File
@@ -0,0 +1,116 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import multiprocessing
import os
import pytest
import subprocess
import time
import ray
from ray.tests.utils import run_and_get_output
def _test_cleanup_on_driver_exit(num_redis_shards):
stdout = run_and_get_output([
"ray",
"start",
"--head",
"--num-redis-shards",
str(num_redis_shards),
])
lines = [m.strip() for m in stdout.split("\n")]
init_cmd = [m for m in lines if m.startswith("ray.init")]
assert 1 == len(init_cmd)
redis_address = init_cmd[0].split("redis_address=\"")[-1][:-2]
max_attempts_before_failing = 100
# Wait for monitor.py to start working.
time.sleep(2)
def StateSummary():
obj_tbl_len = len(ray.global_state.object_table())
task_tbl_len = len(ray.global_state.task_table())
func_tbl_len = len(ray.global_state.function_table())
return obj_tbl_len, task_tbl_len, func_tbl_len
def Driver(success):
success.value = True
# Start driver.
ray.init(redis_address=redis_address)
summary_start = StateSummary()
if (0, 1) != summary_start[:2]:
success.value = False
# Two new objects.
ray.get(ray.put(1111))
ray.get(ray.put(1111))
attempts = 0
while (2, 1, summary_start[2]) != StateSummary():
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
success.value = False
break
@ray.remote
def f():
ray.put(1111) # Yet another object.
return 1111 # A returned object as well.
# 1 new function.
attempts = 0
while (2, 1, summary_start[2] + 1) != StateSummary():
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
success.value = False
break
ray.get(f.remote())
attempts = 0
while (4, 2, summary_start[2] + 1) != StateSummary():
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
success.value = False
break
ray.shutdown()
success = multiprocessing.Value('b', False)
driver = multiprocessing.Process(target=Driver, args=(success, ))
driver.start()
# Wait for client to exit.
driver.join()
# Just make sure Driver() is run and succeeded.
assert success.value
# Check that objects, tasks, and functions are cleaned up.
ray.init(redis_address=redis_address)
attempts = 0
while (0, 1) != StateSummary()[:2]:
time.sleep(0.1)
attempts += 1
if attempts == max_attempts_before_failing:
break
assert (0, 1) == StateSummary()[:2]
ray.shutdown()
subprocess.Popen(["ray", "stop"]).wait()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with the new GCS API.")
def test_cleanup_on_driver_exit_single_redis_shard():
_test_cleanup_on_driver_exit(num_redis_shards=1)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with the new GCS API.")
def test_cleanup_on_driver_exit_many_redis_shards():
_test_cleanup_on_driver_exit(num_redis_shards=5)
_test_cleanup_on_driver_exit(num_redis_shards=31)
+456
View File
@@ -0,0 +1,456 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import pytest
import subprocess
import time
import ray
from ray.tests.utils import (run_and_get_output, run_string_as_driver,
run_string_as_driver_nonblocking)
@pytest.fixture
def ray_start_head():
out = run_and_get_output(["ray", "start", "--head", "--num-cpus=2"])
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
yield redis_address
# Disconnect from the Ray cluster.
ray.shutdown()
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_error_isolation(ray_start_head):
redis_address = ray_start_head
# Connect a driver to the Ray cluster.
ray.init(redis_address=redis_address)
# There shouldn't be any errors yet.
assert len(ray.error_info()) == 0
error_string1 = "error_string1"
error_string2 = "error_string2"
@ray.remote
def f():
raise Exception(error_string1)
# Run a remote function that throws an error.
with pytest.raises(Exception):
ray.get(f.remote())
# Wait for the error to appear in Redis.
while len(ray.error_info()) != 1:
time.sleep(0.1)
print("Waiting for error to appear.")
# Make sure we got the error.
assert len(ray.error_info()) == 1
assert error_string1 in ray.error_info()[0]["message"]
# Start another driver and make sure that it does not receive this
# error. Make the other driver throw an error, and make sure it
# receives that error.
driver_script = """
import ray
import time
ray.init(redis_address="{}")
time.sleep(1)
assert len(ray.error_info()) == 0
@ray.remote
def f():
raise Exception("{}")
try:
ray.get(f.remote())
except Exception as e:
pass
while len(ray.error_info()) != 1:
print(len(ray.error_info()))
time.sleep(0.1)
assert len(ray.error_info()) == 1
assert "{}" in ray.error_info()[0]["message"]
print("success")
""".format(redis_address, error_string2, error_string2)
out = run_string_as_driver(driver_script)
# Make sure the other driver succeeded.
assert "success" in out
# Make sure that the other error message doesn't show up for this
# driver.
assert len(ray.error_info()) == 1
assert error_string1 in ray.error_info()[0]["message"]
def test_remote_function_isolation(ray_start_head):
# This test will run multiple remote functions with the same names in
# two different drivers. Connect a driver to the Ray cluster.
redis_address = ray_start_head
ray.init(redis_address=redis_address)
# Start another driver and make sure that it can define and call its
# own commands with the same names.
driver_script = """
import ray
import time
ray.init(redis_address="{}")
@ray.remote
def f():
return 3
@ray.remote
def g(x, y):
return 4
for _ in range(10000):
result = ray.get([f.remote(), g.remote(0, 0)])
assert result == [3, 4]
print("success")
""".format(redis_address)
out = run_string_as_driver(driver_script)
@ray.remote
def f():
return 1
@ray.remote
def g(x):
return 2
for _ in range(10000):
result = ray.get([f.remote(), g.remote(0)])
assert result == [1, 2]
# Make sure the other driver succeeded.
assert "success" in out
def test_driver_exiting_quickly(ray_start_head):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = ray_start_head
ray.init(redis_address=redis_address)
# Define a driver that creates an actor and exits.
driver_script1 = """
import ray
ray.init(redis_address="{}")
@ray.remote
class Foo(object):
def __init__(self):
pass
Foo.remote()
print("success")
""".format(redis_address)
# Define a driver that creates some tasks and exits.
driver_script2 = """
import ray
ray.init(redis_address="{}")
@ray.remote
def f():
return 1
f.remote()
print("success")
""".format(redis_address)
# Create some drivers and let them exit and make sure everything is
# still alive.
for _ in range(3):
out = run_string_as_driver(driver_script1)
# Make sure the first driver ran to completion.
assert "success" in out
out = run_string_as_driver(driver_script2)
# Make sure the first driver ran to completion.
assert "success" in out
@pytest.fixture
def ray_start_head_with_resources():
out = run_and_get_output(
["ray", "start", "--head", "--num-cpus=1", "--num-gpus=1"])
# Get the redis address from the output.
redis_substring_prefix = "redis_address=\""
redis_address_location = (
out.find(redis_substring_prefix) + len(redis_substring_prefix))
redis_address = out[redis_address_location:]
redis_address = redis_address.split("\"")[0]
yield redis_address
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_drivers_release_resources(ray_start_head_with_resources):
redis_address = ray_start_head_with_resources
# Define a driver that creates an actor and exits.
driver_script1 = """
import time
import ray
ray.init(redis_address="{}")
@ray.remote
def f(duration):
time.sleep(duration)
@ray.remote(num_gpus=1)
def g(duration):
time.sleep(duration)
@ray.remote(num_gpus=1)
class Foo(object):
def __init__(self):
pass
# Make sure some resources are available for us to run tasks.
ray.get(f.remote(0))
ray.get(g.remote(0))
# Start a bunch of actors and tasks that use resources. These should all be
# cleaned up when this driver exits.
foos = [Foo.remote() for _ in range(100)]
[f.remote(10 ** 6) for _ in range(100)]
print("success")
""".format(redis_address)
driver_script2 = (driver_script1 +
"import sys\nsys.stdout.flush()\ntime.sleep(10 ** 6)\n")
def wait_for_success_output(process_handle, timeout=10):
# Wait until the process prints "success" and then return.
start_time = time.time()
while time.time() - start_time < timeout:
output_line = ray.utils.decode(
process_handle.stdout.readline()).strip()
print(output_line)
if output_line == "success":
return
raise Exception("Timed out waiting for process to print success.")
# Make sure we can run this driver repeatedly, which means that resources
# are getting released in between.
for _ in range(5):
out = run_string_as_driver(driver_script1)
# Make sure the first driver ran to completion.
assert "success" in out
# Also make sure that this works when the driver exits ungracefully.
process_handle = run_string_as_driver_nonblocking(driver_script2)
wait_for_success_output(process_handle)
# Kill the process ungracefully.
process_handle.kill()
def test_calling_start_ray_head():
# Test that we can call start-ray.sh with various command line
# parameters. TODO(rkn): This test only tests the --head code path. We
# should also test the non-head node code path.
# Test starting Ray with no arguments.
run_and_get_output(["ray", "start", "--head"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with a redis port specified.
run_and_get_output(["ray", "start", "--head", "--redis-port", "6379"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with a node IP address specified.
run_and_get_output(
["ray", "start", "--head", "--node-ip-address", "127.0.0.1"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with the object manager and node manager ports
# specified.
run_and_get_output([
"ray", "start", "--head", "--object-manager-port", "12345",
"--node-manager-port", "54321"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with the number of CPUs specified.
run_and_get_output(["ray", "start", "--head", "--num-cpus", "2"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with the number of GPUs specified.
run_and_get_output(["ray", "start", "--head", "--num-gpus", "100"])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with the max redis clients specified.
run_and_get_output(
["ray", "start", "--head", "--redis-max-clients", "100"])
subprocess.Popen(["ray", "stop"]).wait()
if "RAY_USE_NEW_GCS" not in os.environ:
# Test starting Ray with redis shard ports specified.
run_and_get_output([
"ray", "start", "--head", "--redis-shard-ports", "6380,6381,6382"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with all arguments specified.
run_and_get_output([
"ray", "start", "--head", "--redis-port", "6379",
"--redis-shard-ports", "6380,6381,6382", "--object-manager-port",
"12345", "--num-cpus", "2", "--num-gpus", "0",
"--redis-max-clients", "100", "--resources", "{\"Custom\": 1}"
])
subprocess.Popen(["ray", "stop"]).wait()
# Test starting Ray with invalid arguments.
with pytest.raises(Exception):
run_and_get_output(
["ray", "start", "--head", "--redis-address", "127.0.0.1:6379"])
subprocess.Popen(["ray", "stop"]).wait()
@pytest.fixture
def ray_start_head_local():
# Start the Ray processes on this machine.
run_and_get_output([
"ray", "start", "--head", "--node-ip-address=localhost",
"--redis-port=6379"
])
yield None
# Disconnect from the Ray cluster.
ray.shutdown()
# Kill the Ray cluster.
subprocess.Popen(["ray", "stop"]).wait()
def test_using_hostnames(ray_start_head_local):
ray.init(node_ip_address="localhost", redis_address="localhost:6379")
@ray.remote
def f():
return 1
assert ray.get(f.remote()) == 1
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
address_info = ray.init(num_cpus=1)
yield address_info
# The code after the yield will run as teardown code.
ray.shutdown()
def test_connecting_in_local_case(ray_start_regular):
address_info = ray_start_regular
# Define a driver that just connects to Redis.
driver_script = """
import ray
ray.init(redis_address="{}")
print("success")
""".format(address_info["redis_address"])
out = run_string_as_driver(driver_script)
# Make sure the other driver succeeded.
assert "success" in out
def test_run_driver_twice(ray_start_regular):
# We used to have issue 2165 and 2288:
# https://github.com/ray-project/ray/issues/2165
# https://github.com/ray-project/ray/issues/2288
# both complain that driver will hang when run for the second time.
# This test is used to verify the fix for above issue, it will run the
# same driver for twice and verify whether both of them succeed.
address_info = ray_start_regular
driver_script = """
import ray
import ray.tune as tune
import os
import time
def train_func(config, reporter): # add a reporter arg
for i in range(2):
time.sleep(0.1)
reporter(timesteps_total=i, mean_accuracy=i+97) # report metrics
os.environ["TUNE_RESUME_PROMPT_OFF"] = "True"
ray.init(redis_address="{}")
ray.tune.register_trainable("train_func", train_func)
tune.run_experiments({{
"my_experiment": {{
"run": "train_func",
"stop": {{"mean_accuracy": 99}},
"config": {{
"layer1": {{
"class_name": tune.grid_search(["a"]),
"config": {{"lr": tune.grid_search([1, 2])}}
}},
}},
"local_dir": os.path.expanduser("~/tmp")
}}
}})
print("success")
""".format(address_info["redis_address"])
for i in range(2):
out = run_string_as_driver(driver_script)
assert "success" in out
def test_driver_exiting_when_worker_blocked(ray_start_head):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
redis_address = ray_start_head
ray.init(redis_address=redis_address)
# Define a driver that creates an actor and exits.
driver_script = """
import time
import ray
ray.init(redis_address="{}")
@ray.remote
def f():
time.sleep(10**6)
@ray.remote
def g():
ray.get(f.remote())
g.remote()
time.sleep(1)
print("success")
""".format(redis_address)
# Create some drivers and let them exit and make sure everything is
# still alive.
for _ in range(3):
out = run_string_as_driver(driver_script)
# Make sure the first driver ran to completion.
assert "success" in out
@ray.remote
def f():
return 1
# Make sure we can still talk with the raylet.
ray.get(f.remote())
+121
View File
@@ -0,0 +1,121 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import logging
import pytest
import time
import ray
import ray.ray_constants as ray_constants
from ray.tests.cluster_utils import Cluster
logger = logging.getLogger(__name__)
@pytest.fixture
def start_connected_cluster():
# Start the Ray processes.
g = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 10
})
})
yield g
# The code after the yield will run as teardown code.
ray.shutdown()
g.shutdown()
@pytest.fixture
def start_connected_longer_cluster():
"""Creates a cluster with a longer timeout."""
g = Cluster(
initialize_head=True,
connect=True,
head_node_args={
"num_cpus": 1,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 20
})
})
yield g
# The code after the yield will run as teardown code.
ray.shutdown()
g.shutdown()
def test_cluster():
"""Basic test for adding and removing nodes in cluster."""
g = Cluster(initialize_head=False)
node = g.add_node()
node2 = g.add_node()
assert node.remaining_processes_alive()
assert node2.remaining_processes_alive()
g.remove_node(node2)
g.remove_node(node)
assert not any(n.any_processes_alive() for n in [node, node2])
def test_shutdown():
g = Cluster(initialize_head=False)
node = g.add_node()
node2 = g.add_node()
g.shutdown()
assert not any(n.any_processes_alive() for n in [node, node2])
def test_internal_config(start_connected_longer_cluster):
"""Checks that the internal configuration setting works.
We set the cluster to timeout nodes after 2 seconds of no timeouts. We
then remove a node, wait for 1 second to check that the cluster is out
of sync, then wait another 2 seconds (giving 1 second of leeway) to check
that the client has timed out.
"""
cluster = start_connected_longer_cluster
worker = cluster.add_node()
cluster.wait_for_nodes()
cluster.remove_node(worker)
time.sleep(1)
assert ray.global_state.cluster_resources()["CPU"] == 2
time.sleep(2)
assert ray.global_state.cluster_resources()["CPU"] == 1
def test_wait_for_nodes(start_connected_cluster):
"""Unit test for `Cluster.wait_for_nodes`.
Adds 4 workers, waits, then removes 4 workers, waits,
then adds 1 worker, waits, and removes 1 worker, waits.
"""
cluster = start_connected_cluster
workers = [cluster.add_node() for i in range(4)]
cluster.wait_for_nodes()
[cluster.remove_node(w) for w in workers]
cluster.wait_for_nodes()
assert ray.global_state.cluster_resources()["CPU"] == 1
worker2 = cluster.add_node()
cluster.wait_for_nodes()
cluster.remove_node(worker2)
cluster.wait_for_nodes()
assert ray.global_state.cluster_resources()["CPU"] == 1
def test_worker_plasma_store_failure(start_connected_cluster):
cluster = start_connected_cluster
worker = cluster.add_node()
cluster.wait_for_nodes()
# Log monitor doesn't die for some reason
worker.kill_log_monitor()
worker.kill_plasma_store()
worker.all_processes[ray_constants.PROCESS_TYPE_RAYLET][0].process.wait()
assert not worker.any_processes_alive(), worker.live_processes()
+63
View File
@@ -0,0 +1,63 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_string_as_driver
@pytest.fixture()
def ray_start_empty_cluster():
cluster = Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# This tests the queue transitions for infeasible tasks. This has been an issue
# in the past, e.g., https://github.com/ray-project/ray/issues/3275.
def test_infeasible_tasks(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
@ray.remote
def f():
return
cluster.add_node(resources={str(0): 100})
ray.init(redis_address=cluster.redis_address)
# Submit an infeasible task.
x_id = f._submit(args=[], kwargs={}, resources={str(1): 1})
# Add a node that makes the task feasible and make sure we can get the
# result.
cluster.add_node(resources={str(1): 100})
ray.get(x_id)
# Start a driver that submits an infeasible task and then let it exit.
driver_script = """
import ray
ray.init(redis_address="{}")
@ray.remote(resources={})
def f():
{}pass # This is a weird hack to insert some blank space.
f.remote()
""".format(cluster.redis_address, "{str(2): 1}", " ")
run_string_as_driver(driver_script)
# Now add a new node that makes the task feasible.
cluster.add_node(resources={str(2): 100})
# Make sure we can still run tasks on all nodes.
ray.get([
f._submit(args=[], kwargs={}, resources={str(i): 1}) for i in range(3)
])
+324
View File
@@ -0,0 +1,324 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from collections import defaultdict
import json
import multiprocessing
import numpy as np
import pytest
import time
import warnings
import ray
from ray.tests.cluster_utils import Cluster
if (multiprocessing.cpu_count() < 40
or ray.utils.get_system_memory() < 50 * 10**9):
warnings.warn("This test must be run on large machines.")
def create_cluster(num_nodes):
cluster = Cluster()
for i in range(num_nodes):
cluster.add_node(resources={str(i): 100}, object_store_memory=10**9)
ray.init(redis_address=cluster.redis_address)
return cluster
@pytest.fixture()
def ray_start_cluster():
num_nodes = 5
cluster = create_cluster(num_nodes)
yield cluster, num_nodes
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
@pytest.fixture()
def ray_start_empty_cluster():
cluster = Cluster()
yield cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
# This test is here to make sure that when we broadcast an object to a bunch of
# machines, we don't have too many excess object transfers.
def test_object_broadcast(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
@ray.remote
def f(x):
return
x = np.zeros(10**8, dtype=np.uint8)
@ray.remote
def create_object():
return np.zeros(10**8, dtype=np.uint8)
object_ids = []
for _ in range(3):
# Broadcast an object to all machines.
x_id = ray.put(x)
object_ids.append(x_id)
ray.get([
f._remote(args=[x_id], resources={str(i % num_nodes): 1})
for i in range(10 * num_nodes)
])
for _ in range(3):
# Broadcast an object to all machines.
x_id = create_object.remote()
object_ids.append(x_id)
ray.get([
f._remote(args=[x_id], resources={str(i % num_nodes): 1})
for i in range(10 * num_nodes)
])
# Wait for profiling information to be pushed to the profile table.
time.sleep(1)
transfer_events = ray.global_state.chrome_tracing_object_transfer_dump()
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_ids:
relevant_events = [
event for event in transfer_events
if event["cat"] == "transfer_send"
and event["args"][0] == x_id.hex() and event["args"][2] == 1
]
# NOTE: Each event currently appears twice because we duplicate the
# send and receive boxes to underline them with a box (black if it is a
# send and gray if it is a receive). So we need to remove these extra
# boxes here.
deduplicated_relevant_events = [
event for event in relevant_events if event["cname"] != "black"
]
assert len(deduplicated_relevant_events) * 2 == len(relevant_events)
relevant_events = deduplicated_relevant_events
# Each object must have been broadcast to each remote machine.
assert len(relevant_events) >= num_nodes - 1
# If more object transfers than necessary have been done, print a
# warning.
if len(relevant_events) > num_nodes - 1:
warnings.warn("This object was transferred {} times, when only {} "
"transfers were required.".format(
len(relevant_events), num_nodes - 1))
# Each object should not have been broadcast more than once from every
# machine to every other machine. Also, a pair of machines should not
# both have sent the object to each other.
assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2
# Make sure that no object was sent multiple times between the same
# pair of object managers.
send_counts = defaultdict(int)
for event in relevant_events:
# The pid identifies the sender and the tid identifies the
# receiver.
send_counts[(event["pid"], event["tid"])] += 1
assert all(value == 1 for value in send_counts.values())
# When submitting an actor method, we try to pre-emptively push its arguments
# to the actor's object manager. However, in the past we did not deduplicate
# the pushes and so the same object could get shipped to the same object
# manager many times. This test checks that that isn't happening.
def test_actor_broadcast(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
@ray.remote
class Actor(object):
def ready(self):
pass
def set_weights(self, x):
pass
actors = [
Actor._remote(args=[], kwargs={}, resources={str(i % num_nodes): 1})
for i in range(100)
]
# Wait for the actors to start up.
ray.get([a.ready.remote() for a in actors])
object_ids = []
# Broadcast a large object to all actors.
for _ in range(10):
x_id = ray.put(np.zeros(10**7, dtype=np.uint8))
object_ids.append(x_id)
# Pass the object into a method for every actor.
ray.get([a.set_weights.remote(x_id) for a in actors])
# Wait for profiling information to be pushed to the profile table.
time.sleep(1)
transfer_events = ray.global_state.chrome_tracing_object_transfer_dump()
# Make sure that each object was transferred a reasonable number of times.
for x_id in object_ids:
relevant_events = [
event for event in transfer_events
if event["cat"] == "transfer_send"
and event["args"][0] == x_id.hex() and event["args"][2] == 1
]
# NOTE: Each event currently appears twice because we duplicate the
# send and receive boxes to underline them with a box (black if it is a
# send and gray if it is a receive). So we need to remove these extra
# boxes here.
deduplicated_relevant_events = [
event for event in relevant_events if event["cname"] != "black"
]
assert len(deduplicated_relevant_events) * 2 == len(relevant_events)
relevant_events = deduplicated_relevant_events
# Each object must have been broadcast to each remote machine.
assert len(relevant_events) >= num_nodes - 1
# If more object transfers than necessary have been done, print a
# warning.
if len(relevant_events) > num_nodes - 1:
warnings.warn("This object was transferred {} times, when only {} "
"transfers were required.".format(
len(relevant_events), num_nodes - 1))
# Each object should not have been broadcast more than once from every
# machine to every other machine. Also, a pair of machines should not
# both have sent the object to each other.
assert len(relevant_events) <= (num_nodes - 1) * num_nodes / 2
# Make sure that no object was sent multiple times between the same
# pair of object managers.
send_counts = defaultdict(int)
for event in relevant_events:
# The pid identifies the sender and the tid identifies the
# receiver.
send_counts[(event["pid"], event["tid"])] += 1
assert all(value == 1 for value in send_counts.values())
# The purpose of this test is to make sure that an object that was already been
# transferred to a node can be transferred again.
def test_object_transfer_retry(ray_start_empty_cluster):
cluster = ray_start_empty_cluster
repeated_push_delay = 10
# Force the sending object manager to allow duplicate pushes again sooner.
# Also, force the receiving object manager to retry the Pull sooner. We
# make the chunk size smaller in order to make it easier to test objects
# with multiple chunks.
config = json.dumps({
"object_manager_repeated_push_delay_ms": repeated_push_delay * 1000,
"object_manager_pull_timeout_ms": repeated_push_delay * 1000 / 4,
"object_manager_default_chunk_size": 1000
})
object_store_memory = 10**8
cluster.add_node(
object_store_memory=object_store_memory, _internal_config=config)
cluster.add_node(
num_gpus=1,
object_store_memory=object_store_memory,
_internal_config=config)
ray.init(redis_address=cluster.redis_address)
@ray.remote(num_gpus=1)
def f(size):
return np.zeros(size, dtype=np.uint8)
# Transfer an object to warm up the object manager.
ray.get(f.remote(10**6))
x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]]
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
# Get the objects locally to cause them to be transferred. This is the
# first time the objects are getting transferred, so it should happen
# quickly.
start_time = time.time()
xs = ray.get(x_ids)
end_time = time.time()
if end_time - start_time > repeated_push_delay:
warnings.warn("The initial transfer took longer than the repeated "
"push delay, so this test may not be testing the thing "
"it's supposed to test.")
# Cause all objects to be flushed.
del xs
x = np.zeros(object_store_memory // 10, dtype=np.uint8)
for _ in range(15):
ray.put(x)
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
# Get the objects again and make sure they get transferred.
xs = ray.get(x_ids)
end_transfer_time = time.time()
# We should have had to wait for the repeated push delay.
assert end_transfer_time - start_time >= repeated_push_delay
# Flush the objects again and wait longer than the repeated push delay and
# make sure that the objects are transferred again.
del xs
for _ in range(15):
ray.put(x)
assert not any(
ray.worker.global_worker.plasma_client.contains(
ray.pyarrow.plasma.ObjectID(x_id.binary())) for x_id in x_ids)
time.sleep(repeated_push_delay)
# Get the objects locally to cause them to be transferred. This should
# happen quickly.
start_time = time.time()
ray.get(x_ids)
end_time = time.time()
assert end_time - start_time < repeated_push_delay
# The purpose of this test is to make sure we can transfer many objects. In the
# past, this has caused failures in which object managers create too many open
# files and run out of resources.
def test_many_small_transfers(ray_start_cluster):
cluster, num_nodes = ray_start_cluster
@ray.remote
def f(*args):
pass
# This function creates 1000 objects on each machine and then transfers
# each object to every other machine.
def do_transfers():
id_lists = []
for i in range(num_nodes):
id_lists.append([
f._remote(args=[], kwargs={}, resources={str(i): 1})
for _ in range(1000)
])
ids = []
for i in range(num_nodes):
for j in range(num_nodes):
if i == j:
continue
ids.append(
f._remote(
args=id_lists[j], kwargs={}, resources={str(i): 1}))
# Wait for all of the transfers to finish.
ray.get(ids)
do_transfers()
do_transfers()
do_transfers()
do_transfers()
@@ -7,7 +7,7 @@ import pytest
import redis
import ray
from ray.test.cluster_utils import Cluster
from ray.tests.cluster_utils import Cluster
@pytest.fixture
+39
View File
@@ -0,0 +1,39 @@
# This test is not inside of runtest.py because when a recursive remote
# function is defined inside of another function, we currently can't handle
# that.
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import ray
@pytest.fixture
def ray_start():
# Start ray instance
ray.init(num_cpus=1)
# Run test using this fixture
yield None
# Shutdown ray instance
ray.shutdown()
@ray.remote
def factorial(n):
if n == 0:
return 1
return n * ray.get(factorial.remote(n - 1))
def test_recursion(ray_start):
assert ray.get(factorial.remote(0)) == 1
assert ray.get(factorial.remote(1)) == 1
assert ray.get(factorial.remote(2)) == 2
assert ray.get(factorial.remote(3)) == 6
assert ray.get(factorial.remote(4)) == 24
assert ray.get(factorial.remote(5)) == 120
+281
View File
@@ -0,0 +1,281 @@
import pytest
import ray
import ray.experimental.signal as signal
class UserSignal(signal.Signal):
def __init__(self, value):
self.value = value
@pytest.fixture
def ray_start():
# Start the Ray processes.
ray.init(num_cpus=4)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def receive_all_signals(sources, timeout):
# Get all signals from sources, until there is no signal for a time
# period of timeout.
results = []
while True:
r = signal.receive(sources, timeout=timeout)
if len(r) == 0:
return results
else:
results.extend(r)
def test_task_to_driver(ray_start):
# Send a signal from a task to the driver.
@ray.remote
def task_send_signal(value):
signal.send(UserSignal(value))
return
signal_value = "simple signal"
object_id = task_send_signal.remote(signal_value)
result_list = signal.receive([object_id], timeout=10)
print(result_list[0][1])
assert len(result_list) == 1
def test_send_signal_from_actor_to_driver(ray_start):
# Send several signals from an actor, and receive them in the driver.
@ray.remote
class ActorSendSignal(object):
def __init__(self):
pass
def send_signal(self, value):
signal.send(UserSignal(value))
a = ActorSendSignal.remote()
signal_value = "simple signal"
count = 6
for i in range(count):
ray.get(a.send_signal.remote(signal_value + str(i)))
result_list = receive_all_signals([a], timeout=5)
assert len(result_list) == count
for i in range(count):
assert signal_value + str(i) == result_list[i][1].value
def test_send_signals_from_actor_to_driver(ray_start):
# Send "count" signal at intervals from an actor and get
# these signals in the driver.
@ray.remote
class ActorSendSignals(object):
def __init__(self):
pass
def send_signals(self, value, count):
for i in range(count):
signal.send(UserSignal(value + str(i)))
a = ActorSendSignals.remote()
signal_value = "simple signal"
count = 20
a.send_signals.remote(signal_value, count)
received_count = 0
while True:
result_list = signal.receive([a], timeout=5)
received_count += len(result_list)
if (received_count == count):
break
assert True
def test_task_crash(ray_start):
# Get an error when ray.get() is called on the return of a failed task.
@ray.remote
def crashing_function():
raise Exception("exception message")
object_id = crashing_function.remote()
try:
ray.get(object_id)
except Exception as e:
assert type(e) == ray.exceptions.RayTaskError
finally:
result_list = signal.receive([object_id], timeout=5)
assert len(result_list) == 1
assert type(result_list[0][1]) == signal.ErrorSignal
def test_task_crash_without_get(ray_start):
# Get an error when task failed.
@ray.remote
def crashing_function():
raise Exception("exception message")
object_id = crashing_function.remote()
result_list = signal.receive([object_id], timeout=5)
assert len(result_list) == 1
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash(ray_start):
# Get an error when ray.get() is called on a return parameter
# of a method that failed.
@ray.remote
class Actor(object):
def __init__(self):
pass
def crash(self):
raise Exception("exception message")
a = Actor.remote()
try:
ray.get(a.crash.remote())
except Exception as e:
assert type(e) == ray.exceptions.RayTaskError
finally:
result_list = signal.receive([a], timeout=5)
assert len(result_list) == 1
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init(ray_start):
# Get an error when an actor's __init__ failed.
@ray.remote
class ActorCrashInit(object):
def __init__(self):
raise Exception("exception message")
def m(self):
return 1
# Do not catch the exception in the __init__.
a = ActorCrashInit.remote()
result_list = signal.receive([a], timeout=5)
assert len(result_list) == 1
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init2(ray_start):
# Get errors when (1) __init__ fails, and (2) subsequently when
# ray.get() is called on the return parameter of another method
# of the actor.
@ray.remote
class ActorCrashInit(object):
def __init__(self):
raise Exception("exception message")
def method(self):
return 1
a = ActorCrashInit.remote()
try:
ray.get(a.method.remote())
except Exception as e:
assert type(e) == ray.exceptions.RayTaskError
finally:
result_list = receive_all_signals([a], timeout=5)
assert len(result_list) == 2
assert type(result_list[0][1]) == signal.ErrorSignal
def test_actor_crash_init3(ray_start):
# Get errors when (1) __init__ fails, and (2) subsequently when
# another method of the actor is invoked.
@ray.remote
class ActorCrashInit(object):
def __init__(self):
raise Exception("exception message")
def method(self):
return 1
a = ActorCrashInit.remote()
a.method.remote()
result_list = signal.receive([a], timeout=10)
assert len(result_list) == 1
assert type(result_list[0][1]) == signal.ErrorSignal
def test_send_signals_from_actor_to_actor(ray_start):
# Send "count" signal at intervals of 100ms from two actors and get
# these signals in another actor.
@ray.remote
class ActorSendSignals(object):
def __init__(self):
pass
def send_signals(self, value, count):
for i in range(count):
signal.send(UserSignal(value + str(i)))
@ray.remote
class ActorGetSignalsAll(object):
def __init__(self):
self.received_signals = []
def register_handle(self, handle):
self.this_actor = handle
def get_signals(self, source_ids, count):
new_signals = receive_all_signals(source_ids, timeout=5)
for s in new_signals:
self.received_signals.append(s)
if len(self.received_signals) < count:
self.this_actor.get_signals.remote(source_ids, count)
else:
return
def get_count(self):
return len(self.received_signals)
a1 = ActorSendSignals.remote()
a2 = ActorSendSignals.remote()
signal_value = "simple signal"
count = 20
ray.get(a1.send_signals.remote(signal_value, count))
ray.get(a2.send_signals.remote(signal_value, count))
b = ActorGetSignalsAll.remote()
ray.get(b.register_handle.remote(b))
b.get_signals.remote([a1, a2], count)
received_count = ray.get(b.get_count.remote())
assert received_count == 2 * count
def test_forget(ray_start):
# Send "count" signals on behalf of an actor, then ignore all these
# signals, and then send anther "count" signals on behalf of the same
# actor. Then show that the driver only gets the last "count" signals.
@ray.remote
class ActorSendSignals(object):
def __init__(self):
pass
def send_signals(self, value, count):
for i in range(count):
signal.send(UserSignal(value + str(i)))
a = ActorSendSignals.remote()
signal_value = "simple signal"
count = 5
ray.get(a.send_signals.remote(signal_value, count))
signal.forget([a])
ray.get(a.send_signals.remote(signal_value, count))
result_list = receive_all_signals([a], timeout=5)
assert len(result_list) == count
+556
View File
@@ -0,0 +1,556 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import numpy as np
import os
import pytest
import time
import ray
from ray.tests.cluster_utils import Cluster
import ray.ray_constants as ray_constants
@pytest.fixture(params=[1, 20])
def ray_start_sharded(request):
num_redis_shards = request.param
if os.environ.get("RAY_USE_NEW_GCS") == "on":
num_redis_shards = 1
# For now, RAY_USE_NEW_GCS supports 1 shard, and credis supports
# 1-node chain for that shard only.
# Start the Ray processes.
ray.init(
num_cpus=10, num_redis_shards=num_redis_shards, redis_max_memory=10**7)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_combination(request):
num_nodes = request.param[0]
num_workers_per_scheduler = request.param[1]
# Start the Ray processes.
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 10,
"redis_max_memory": 10**7
})
for i in range(num_nodes - 1):
cluster.add_node(num_cpus=10)
ray.init(redis_address=cluster.redis_address)
yield num_nodes, num_workers_per_scheduler, cluster
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_submitting_tasks(ray_start_combination):
_, _, cluster = ray_start_combination
@ray.remote
def f(x):
return x
for _ in range(1):
ray.get([f.remote(1) for _ in range(1000)])
for _ in range(10):
ray.get([f.remote(1) for _ in range(100)])
for _ in range(100):
ray.get([f.remote(1) for _ in range(10)])
for _ in range(1000):
ray.get([f.remote(1) for _ in range(1)])
assert cluster.remaining_processes_alive()
def test_dependencies(ray_start_combination):
_, _, cluster = ray_start_combination
@ray.remote
def f(x):
return x
x = 1
for _ in range(1000):
x = f.remote(x)
ray.get(x)
@ray.remote
def g(*xs):
return 1
xs = [g.remote(1)]
for _ in range(100):
xs.append(g.remote(*xs))
xs.append(g.remote(1))
ray.get(xs)
assert cluster.remaining_processes_alive()
def test_submitting_many_tasks(ray_start_sharded):
@ray.remote
def f(x):
return 1
def g(n):
x = 1
for i in range(n):
x = f.remote(x)
return x
ray.get([g(1000) for _ in range(100)])
assert ray.services.remaining_processes_alive()
def test_submitting_many_actors_to_one(ray_start_sharded):
@ray.remote
class Actor(object):
def __init__(self):
pass
def ping(self):
return
@ray.remote
class Worker(object):
def __init__(self, actor):
self.actor = actor
def ping(self):
return ray.get(self.actor.ping.remote())
a = Actor.remote()
workers = [Worker.remote(a) for _ in range(100)]
for _ in range(10):
out = ray.get([w.ping.remote() for w in workers])
assert out == [None for _ in workers]
def test_getting_and_putting(ray_start_sharded):
for n in range(8):
x = np.zeros(10**n)
for _ in range(100):
ray.put(x)
x_id = ray.put(x)
for _ in range(1000):
ray.get(x_id)
assert ray.services.remaining_processes_alive()
def test_getting_many_objects(ray_start_sharded):
@ray.remote
def f():
return 1
n = 10**4 # TODO(pcm): replace by 10 ** 5 once this is faster.
lst = ray.get([f.remote() for _ in range(n)])
assert lst == n * [1]
assert ray.services.remaining_processes_alive()
def test_wait(ray_start_combination):
num_nodes, num_workers_per_scheduler, cluster = ray_start_combination
num_workers = num_nodes * num_workers_per_scheduler
@ray.remote
def f(x):
return x
x_ids = [f.remote(i) for i in range(100)]
for i in range(len(x_ids)):
ray.wait([x_ids[i]])
for i in range(len(x_ids) - 1):
ray.wait(x_ids[i:])
@ray.remote
def g(x):
time.sleep(x)
for i in range(1, 5):
x_ids = [
g.remote(np.random.uniform(0, i)) for _ in range(2 * num_workers)
]
ray.wait(x_ids, num_returns=len(x_ids))
assert cluster.remaining_processes_alive()
@pytest.fixture(params=[1, 4])
def ray_start_reconstruction(request):
num_nodes = request.param
plasma_store_memory = int(0.5 * 10**9)
cluster = Cluster(
initialize_head=True,
head_node_args={
"num_cpus": 1,
"object_store_memory": plasma_store_memory // num_nodes,
"redis_max_memory": 10**7,
"_internal_config": json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
})
})
for i in range(num_nodes - 1):
cluster.add_node(
num_cpus=1,
object_store_memory=plasma_store_memory // num_nodes,
_internal_config=json.dumps({
"initial_reconstruction_timeout_milliseconds": 200
}))
ray.init(redis_address=cluster.redis_address)
yield plasma_store_memory, num_nodes, cluster
# Clean up the Ray cluster.
ray.shutdown()
cluster.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_simple(ray_start_reconstruction):
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 100
size = int(plasma_store_memory * 1.5 / (num_objects * 8))
# Define a remote task with no dependencies, which returns a numpy
# array of the given size.
@ray.remote
def foo(i, size):
array = np.zeros(size)
array[0] = i
return array
# Launch num_objects instances of the remote task.
args = []
for i in range(num_objects):
args.append(foo.remote(i, size))
# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get each value again to force reconstruction.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get values sequentially, in chunks.
num_chunks = 4 * num_nodes
chunk = num_objects // num_chunks
for i in range(num_chunks):
values = ray.get(args[i * chunk:(i + 1) * chunk])
del values
assert cluster.remaining_processes_alive()
def sorted_random_indexes(total, output_num):
random_indexes = [np.random.randint(total) for _ in range(output_num)]
random_indexes.sort()
return random_indexes
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_recursive(ray_start_reconstruction):
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 100
size = int(plasma_store_memory * 1.5 / (num_objects * 8))
# Define a root task with no dependencies, which returns a numpy array
# of the given size.
@ray.remote
def no_dependency_task(size):
array = np.zeros(size)
return array
# Define a task with a single dependency, which returns its one
# argument.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg
# Launch num_objects instances of the remote task, each dependent on
# the one before it.
arg = no_dependency_task.remote(size)
args = []
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get each value again to force reconstruction.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get 10 values randomly.
random_indexes = sorted_random_indexes(num_objects, 10)
for i in random_indexes:
value = ray.get(args[i])
assert value[0] == i
# Get values sequentially, in chunks.
num_chunks = 4 * num_nodes
chunk = num_objects // num_chunks
for i in range(num_chunks):
values = ray.get(args[i * chunk:(i + 1) * chunk])
del values
assert cluster.remaining_processes_alive()
@pytest.mark.skip(reason="This test often hangs or fails in CI.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_multiple_recursive(ray_start_reconstruction):
plasma_store_memory, _, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 100
size = plasma_store_memory * 2 // (num_objects * 8)
# Define a root task with no dependencies, which returns a numpy array
# of the given size.
@ray.remote
def no_dependency_task(size):
array = np.zeros(size)
return array
# Define a task with multiple dependencies, which returns its first
# argument.
@ray.remote
def multiple_dependency(i, arg1, arg2, arg3):
arg1 = np.copy(arg1)
arg1[0] = i
return arg1
# Launch num_args instances of the root task. Then launch num_objects
# instances of the multi-dependency remote task, each dependent on the
# num_args tasks before it.
num_args = 3
args = []
for i in range(num_args):
arg = no_dependency_task.remote(size)
args.append(arg)
for i in range(num_objects):
args.append(multiple_dependency.remote(i, *args[i:i + num_args]))
# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
args = args[num_args:]
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get each value again to force reconstruction.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get 10 values randomly.
random_indexes = sorted_random_indexes(num_objects, 10)
for i in random_indexes:
value = ray.get(args[i])
assert value[0] == i
assert cluster.remaining_processes_alive()
def wait_for_errors(error_check):
# Wait for errors from all the nondeterministic tasks.
errors = []
time_left = 100
while time_left > 0:
errors = ray.error_info()
if error_check(errors):
break
time_left -= 1
time.sleep(1)
# Make sure that enough errors came through.
assert error_check(errors)
return errors
@pytest.mark.skip("This test does not work yet.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_nondeterministic_task(ray_start_reconstruction):
plasma_store_memory, num_nodes, cluster = ray_start_reconstruction
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 1000
size = plasma_store_memory * 2 // (num_objects * 8)
# Define a nondeterministic remote task with no dependencies, which
# returns a random numpy array of the given size. This task should
# produce an error on the driver if it is ever reexecuted.
@ray.remote
def foo(i, size):
array = np.random.rand(size)
array[0] = i
return array
# Define a deterministic remote task with no dependencies, which
# returns a numpy array of zeros of the given size.
@ray.remote
def bar(i, size):
array = np.zeros(size)
array[0] = i
return array
# Launch num_objects instances, half deterministic and half
# nondeterministic.
args = []
for i in range(num_objects):
if i % 2 == 0:
args.append(foo.remote(i, size))
else:
args.append(bar.remote(i, size))
# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get each value again to force reconstruction.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
def error_check(errors):
if num_nodes == 1:
# In a single-node setting, each object is evicted and
# reconstructed exactly once, so exactly half the objects will
# produce an error during reconstruction.
min_errors = num_objects // 2
else:
# In a multinode setting, each object is evicted zero or one
# times, so some of the nondeterministic tasks may not be
# reexecuted.
min_errors = 1
return len(errors) >= min_errors
errors = wait_for_errors(error_check)
# Make sure all the errors have the correct type.
assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
for error in errors)
assert cluster.remaining_processes_alive()
@pytest.fixture
def ray_start_driver_put_errors():
plasma_store_memory = 10**9
# Start the Ray processes.
ray.init(num_cpus=1, object_store_memory=plasma_store_memory)
yield plasma_store_memory
# The code after the yield will run as teardown code.
ray.shutdown()
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
def test_driver_put_errors(ray_start_driver_put_errors):
plasma_store_memory = ray_start_driver_put_errors
# Define the size of one task's return argument so that the combined
# sum of all objects' sizes is at least twice the plasma stores'
# combined allotted memory.
num_objects = 100
size = plasma_store_memory * 2 // (num_objects * 8)
# Define a task with a single dependency, a numpy array, that returns
# another array.
@ray.remote
def single_dependency(i, arg):
arg = np.copy(arg)
arg[0] = i
return arg
# Launch num_objects instances of the remote task, each dependent on
# the one before it. The first instance of the task takes a numpy array
# as an argument, which is put into the object store.
args = []
arg = single_dependency.remote(0, np.zeros(size))
for i in range(num_objects):
arg = single_dependency.remote(i, arg)
args.append(arg)
# Get each value to force each task to finish. After some number of
# gets, old values should be evicted.
for i in range(num_objects):
value = ray.get(args[i])
assert value[0] == i
# Get each value starting from the beginning to force reconstruction.
# Currently, since we're not able to reconstruct `ray.put` objects that
# were evicted and whose originating tasks are still running, this
# for-loop should hang on its first iteration and push an error to the
# driver.
ray.worker.global_worker.raylet_client.fetch_or_reconstruct([args[0]],
False)
def error_check(errors):
return len(errors) > 1
errors = wait_for_errors(error_check)
assert all(error["type"] == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
for error in errors)
# NOTE(swang): This test tries to launch 1000 workers and breaks.
# TODO(rkn): This test needs to be updated to use pytest.
# class WorkerPoolTests(unittest.TestCase):
#
# def tearDown(self):
# ray.shutdown()
#
# def testBlockingTasks(self):
# @ray.remote
# def f(i, j):
# return (i, j)
#
# @ray.remote
# def g(i):
# # Each instance of g submits and blocks on the result of another remote
# # task.
# object_ids = [f.remote(i, j) for j in range(10)]
# return ray.get(object_ids)
#
# ray.init(num_workers=1)
# ray.get([g.remote(i) for i in range(1000)])
# ray.shutdown()
+105
View File
@@ -0,0 +1,105 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import time
import pytest
import ray
def test_conn_cluster():
# plasma_store_socket_name
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379",
plasma_store_socket_name="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
"plasma_store_socket_name must not be provided.")
# raylet_socket_name
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379",
raylet_socket_name="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
"raylet_socket_name must not be provided.")
# temp_dir
with pytest.raises(Exception) as exc_info:
ray.init(
redis_address="127.0.0.1:6379", temp_dir="/tmp/this_should_fail")
assert exc_info.value.args[0] == (
"When connecting to an existing cluster, "
"temp_dir must not be provided.")
def test_tempdir():
ray.init(temp_dir="/tmp/i_am_a_temp_dir")
assert os.path.exists(
"/tmp/i_am_a_temp_dir"), "Specified temp dir not found."
ray.shutdown()
shutil.rmtree("/tmp/i_am_a_temp_dir", ignore_errors=True)
def test_raylet_socket_name():
ray.init(raylet_socket_name="/tmp/i_am_a_temp_socket")
assert os.path.exists(
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
ray.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket")
except Exception:
pass
def test_temp_plasma_store_socket():
ray.init(plasma_store_socket_name="/tmp/i_am_a_temp_socket")
assert os.path.exists(
"/tmp/i_am_a_temp_socket"), "Specified socket path not found."
ray.shutdown()
try:
os.remove("/tmp/i_am_a_temp_socket")
except Exception:
pass
def test_raylet_tempfiles():
ray.init(num_cpus=0)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
assert top_levels.issuperset({"sockets", "logs"})
log_files = set(os.listdir(node.get_logs_dir_path()))
assert log_files.issuperset({
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out",
"raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err",
"redis.out", "redis.err", "raylet.out", "raylet.err"
}) # with raylet logs
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
ray.shutdown()
ray.init(num_cpus=2)
node = ray.worker._global_node
top_levels = set(os.listdir(node.get_temp_dir_path()))
assert top_levels.issuperset({"sockets", "logs"})
time.sleep(3) # wait workers to start
log_files = set(os.listdir(node.get_logs_dir_path()))
assert log_files.issuperset({
"log_monitor.out", "log_monitor.err", "plasma_store.out",
"plasma_store.err", "monitor.out", "monitor.err", "raylet_monitor.out",
"raylet_monitor.err", "redis-shard_0.out", "redis-shard_0.err",
"redis.out", "redis.err", "raylet.out", "raylet.err"
}) # with raylet logs
# Check numbers of worker log file.
assert sum(
1 for filename in log_files if filename.startswith("worker")) == 4
socket_files = set(os.listdir(node.get_sockets_dir_path()))
assert socket_files == {"plasma_store", "raylet"}
ray.shutdown()
+261
View File
@@ -0,0 +1,261 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from numpy.testing import assert_almost_equal
import pytest
import tensorflow as tf
import ray
def make_linear_network(w_name=None, b_name=None):
# Define the inputs.
x_data = tf.placeholder(tf.float32, shape=[100])
y_data = tf.placeholder(tf.float32, shape=[100])
# Define the weights and computation.
w = tf.Variable(tf.random_uniform([1], -1.0, 1.0), name=w_name)
b = tf.Variable(tf.zeros([1]), name=b_name)
y = w * x_data + b
# Return the loss and weight initializer.
return (tf.reduce_mean(tf.square(y - y_data)),
tf.global_variables_initializer(), x_data, y_data)
class LossActor(object):
def __init__(self, use_loss=True):
# Uses a separate graph for each network.
with tf.Graph().as_default():
# Create the network.
var = [tf.Variable(1)]
loss, init, _, _ = make_linear_network()
sess = tf.Session()
# Additional code for setting and getting the weights.
weights = ray.experimental.TensorFlowVariables(
loss if use_loss else None, sess, input_variables=var)
# Return all of the data needed to use the network.
self.values = [weights, init, sess]
sess.run(init)
def set_and_get_weights(self, weights):
self.values[0].set_weights(weights)
return self.values[0].get_weights()
def get_weights(self):
return self.values[0].get_weights()
class NetActor(object):
def __init__(self):
# Uses a separate graph for each network.
with tf.Graph().as_default():
# Create the network.
loss, init, _, _ = make_linear_network()
sess = tf.Session()
# Additional code for setting and getting the weights.
variables = ray.experimental.TensorFlowVariables(loss, sess)
# Return all of the data needed to use the network.
self.values = [variables, init, sess]
sess.run(init)
def set_and_get_weights(self, weights):
self.values[0].set_weights(weights)
return self.values[0].get_weights()
def get_weights(self):
return self.values[0].get_weights()
class TrainActor(object):
def __init__(self):
# Almost the same as above, but now returns the placeholders and
# gradient.
with tf.Graph().as_default():
loss, init, x_data, y_data = make_linear_network()
sess = tf.Session()
variables = ray.experimental.TensorFlowVariables(loss, sess)
optimizer = tf.train.GradientDescentOptimizer(0.9)
grads = optimizer.compute_gradients(loss)
train = optimizer.apply_gradients(grads)
self.values = [
loss, variables, init, sess, grads, train, [x_data, y_data]
]
sess.run(init)
def training_step(self, weights):
_, variables, _, sess, grads, _, placeholders = self.values
variables.set_weights(weights)
return sess.run(
[grad[0] for grad in grads],
feed_dict=dict(zip(placeholders, [[1] * 100, [2] * 100])))
def get_weights(self):
return self.values[1].get_weights()
@pytest.fixture
def ray_start_regular():
# Start the Ray processes.
ray.init(num_cpus=2)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
def test_tensorflow_variables(ray_start_regular):
sess = tf.Session()
loss, init, _, _ = make_linear_network()
sess.run(init)
variables = ray.experimental.TensorFlowVariables(loss, sess)
weights = variables.get_weights()
for (name, val) in weights.items():
weights[name] += 1.0
variables.set_weights(weights)
assert weights == variables.get_weights()
loss2, init2, _, _ = make_linear_network("w", "b")
sess.run(init2)
variables2 = ray.experimental.TensorFlowVariables(loss2, sess)
weights2 = variables2.get_weights()
for (name, val) in weights2.items():
weights2[name] += 2.0
variables2.set_weights(weights2)
assert weights2 == variables2.get_weights()
flat_weights = variables2.get_flat() + 2.0
variables2.set_flat(flat_weights)
assert_almost_equal(flat_weights, variables2.get_flat())
variables3 = ray.experimental.TensorFlowVariables([loss2])
assert variables3.sess is None
sess = tf.Session()
variables3.set_session(sess)
assert variables3.sess == sess
# Test that the variable names for the two different nets are not
# modified by TensorFlow to be unique (i.e., they should already
# be unique because of the variable prefix).
def test_variable_name_collision(ray_start_regular):
net1 = NetActor()
net2 = NetActor()
# This is checking that the variable names of the two nets are the
# same, i.e., that the names in the weight dictionaries are the same.
net1.values[0].set_weights(net2.values[0].get_weights())
# Test that TensorFlowVariables can take in addition variables through
# input_variables arg and with no loss.
def test_additional_variables_no_loss(ray_start_regular):
net = LossActor(use_loss=False)
assert len(net.values[0].variables.items()) == 1
assert len(net.values[0].placeholders.items()) == 1
net.values[0].set_weights(net.values[0].get_weights())
# Test that TensorFlowVariables can take in addition variables through
# input_variables arg and with a loss.
def test_additional_variables_with_loss(ray_start_regular):
net = LossActor()
assert len(net.values[0].variables.items()) == 3
assert len(net.values[0].placeholders.items()) == 3
net.values[0].set_weights(net.values[0].get_weights())
# Test that different networks on the same worker are independent and
# we can get/set their weights without any interaction.
def test_networks_independent(ray_start_regular):
# Note we use only one worker to ensure that all of the remote
# functions run on the same worker.
net1 = NetActor()
net2 = NetActor()
# Make sure the two networks have different weights. TODO(rkn): Note
# that equality comparisons of numpy arrays normally does not work.
# This only works because at the moment they have size 1.
weights1 = net1.get_weights()
weights2 = net2.get_weights()
assert weights1 != weights2
# Set the weights and get the weights, and make sure they are
# unchanged.
new_weights1 = net1.set_and_get_weights(weights1)
new_weights2 = net2.set_and_get_weights(weights2)
assert weights1 == new_weights1
assert weights2 == new_weights2
# Swap the weights.
new_weights1 = net2.set_and_get_weights(weights1)
new_weights2 = net1.set_and_get_weights(weights2)
assert weights1 == new_weights1
assert weights2 == new_weights2
# This test creates an additional network on the driver so that the
# tensorflow variables on the driver and the worker differ.
def test_network_driver_worker_independent(ray_start_regular):
# Create a network on the driver locally.
sess1 = tf.Session()
loss1, init1, _, _ = make_linear_network()
ray.experimental.TensorFlowVariables(loss1, sess1)
sess1.run(init1)
net2 = ray.remote(NetActor).remote()
weights2 = ray.get(net2.get_weights.remote())
new_weights2 = ray.get(
net2.set_and_get_weights.remote(net2.get_weights.remote()))
assert weights2 == new_weights2
def test_variables_control_dependencies(ray_start_regular):
# Creates a network and appends a momentum optimizer.
sess = tf.Session()
loss, init, _, _ = make_linear_network()
minimizer = tf.train.MomentumOptimizer(0.9, 0.9).minimize(loss)
net_vars = ray.experimental.TensorFlowVariables(minimizer, sess)
sess.run(init)
# Tests if all variables are properly retrieved, 2 variables and 2
# momentum variables.
assert len(net_vars.variables.items()) == 4
def test_remote_training_step(ray_start_regular):
net = ray.remote(TrainActor).remote()
ray.get(net.training_step.remote(net.get_weights.remote()))
def test_remote_training_loss(ray_start_regular):
net = ray.remote(TrainActor).remote()
net_values = TrainActor().values
loss, variables, _, sess, grads, train, placeholders = net_values
before_acc = sess.run(
loss, feed_dict=dict(zip(placeholders, [[2] * 100, [4] * 100])))
for _ in range(3):
gradients_list = ray.get([
net.training_step.remote(variables.get_weights()) for _ in range(2)
])
mean_grads = [
sum(gradients[i]
for gradients in gradients_list) / len(gradients_list)
for i in range(len(gradients_list[0]))
]
feed_dict = {
grad[0]: mean_grad
for (grad, mean_grad) in zip(grads, mean_grads)
}
sess.run(train, feed_dict=feed_dict)
after_acc = sess.run(
loss, feed_dict=dict(zip(placeholders, [[2] * 100, [4] * 100])))
assert before_acc < after_acc
+2 -2
View File
@@ -16,8 +16,8 @@ except ImportError:
import ray
from ray import tune
from ray.rllib import _register_all
from ray.test.cluster_utils import Cluster
from ray.test.test_utils import run_string_as_driver_nonblocking
from ray.tests.cluster_utils import Cluster
from ray.tests.utils import run_string_as_driver_nonblocking
from ray.tune.error import TuneError
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial