Shard unit tests into medium sized files for test stability (#6398)

This commit is contained in:
Eric Liang
2019-12-09 13:15:29 -08:00
committed by GitHub
parent a6bc2b1842
commit 304b4f0d3d
27 changed files with 3049 additions and 2832 deletions
+4
View File
@@ -16,6 +16,10 @@ def env_integer(key, default):
return default
def direct_call_enabled():
return bool(int(os.environ.get("RAY_FORCE_DIRECT", "0")))
ID_SIZE = 20
# The default maximum number of bytes to allocate to the object store unless
+38 -11
View File
@@ -56,33 +56,65 @@ py_test(
py_test(
name = "test_advanced",
size = "large",
size = "medium",
srcs = ["test_advanced.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_advanced_2",
size = "medium",
srcs = ["test_advanced_2.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_advanced_3",
size = "medium",
srcs = ["test_advanced_3.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_component_failures",
size = "large",
size = "small",
srcs = ["test_component_failures.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_component_failures_2",
size = "medium",
srcs = ["test_component_failures_2.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_component_failures_3",
size = "medium",
srcs = ["test_component_failures_3.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_multinode_failures",
size = "large",
size = "medium",
srcs = ["test_multinode_failures.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_multinode_failures_direct",
name = "test_multinode_failures_2",
size = "medium",
srcs = ["test_multinode_failures_direct.py", "test_multinode_failures.py"],
tags = ["exclusive", "manual"],
srcs = ["test_multinode_failures_2.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
@@ -131,7 +163,6 @@ py_test(
size = "medium",
srcs = ["test_array.py"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
@@ -139,7 +170,6 @@ py_test(
size = "small",
srcs = ["test_autoscaler.py"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
@@ -161,7 +191,6 @@ py_test(
size = "small",
srcs = ["test_debug_tools.py"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
@@ -186,7 +215,6 @@ py_test(
srcs = ["test_failure_direct.py", "test_failure.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
@@ -304,7 +332,6 @@ py_test(
size = "small",
srcs = ["test_queue.py"],
deps = ["//:ray_lib"],
flaky = 1,
)
py_test(
+4 -3
View File
@@ -16,9 +16,10 @@ import time
import ray
import ray.test_utils
import ray.cluster_utils
from ray import ray_constants
from ray.test_utils import run_string_as_driver
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
def test_actor_init_error_propagated(ray_start_regular):
@@ -452,7 +453,7 @@ def test_multiple_actors(ray_start_regular):
def reset(self):
self.value = 0
num_actors = 20
num_actors = 5
num_increases = 50
# Create multiple actors.
actors = [Counter.remote(i) for i in range(num_actors)]
@@ -1181,7 +1182,7 @@ def test_fork_consistency(setup_queue_actor):
return ray.get(x)
# Fork num_iters times.
num_forks = 10
num_forks = 5
num_items_per_fork = 100
# Submit some tasks on new actor handles.
+2 -1
View File
@@ -16,8 +16,9 @@ import time
import ray
import ray.test_utils
import ray.cluster_utils
from ray import ray_constants
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
def test_actor_deletion_with_gpus(shutdown_only):
File diff suppressed because it is too large Load Diff
+741
View File
@@ -0,0 +1,741 @@
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import logging
import os
import sys
import time
import numpy as np
import pytest
import ray
import ray.cluster_utils
import ray.test_utils
from ray.test_utils import RayTestTimeoutException
logger = logging.getLogger(__name__)
def test_resource_constraints(shutdown_only):
num_workers = 20
ray.init(num_cpus=10, num_gpus=2)
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(
ray.get([
get_worker_id.remote() for _ in range(num_workers)
]))) == num_workers:
break
time_buffer = 2
# At most 10 copies of this can run at once.
@ray.remote(num_cpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(10)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(11)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
@ray.remote(num_cpus=3)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
@ray.remote(num_gpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(2)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
def test_multi_resource_constraints(shutdown_only):
num_workers = 20
ray.init(num_cpus=10, num_gpus=10)
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(
ray.get([
get_worker_id.remote() for _ in range(num_workers)
]))) == num_workers:
break
@ray.remote(num_cpus=1, num_gpus=9)
def f(n):
time.sleep(n)
@ray.remote(num_cpus=9, num_gpus=1)
def g(n):
time.sleep(n)
time_buffer = 2
start_time = time.time()
ray.get([f.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5), g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
def test_gpu_ids(shutdown_only):
num_gpus = 10
ray.init(num_cpus=10, num_gpus=num_gpus)
def get_gpu_ids(num_gpus_per_worker):
time.sleep(0.1)
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == num_gpus_per_worker
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0))
f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1))
f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2))
f4 = ray.remote(num_gpus=4)(lambda: get_gpu_ids(4))
f5 = ray.remote(num_gpus=5)(lambda: get_gpu_ids(5))
# Wait for all workers to start up.
@ray.remote
def f():
time.sleep(0.1)
return os.getpid()
start_time = time.time()
while True:
if len(set(ray.get([f.remote() for _ in range(10)]))) == 10:
break
if time.time() > start_time + 10:
raise RayTestTimeoutException(
"Timed out while waiting for workers to start "
"up.")
list_of_ids = ray.get([f0.remote() for _ in range(10)])
assert list_of_ids == 10 * [[]]
list_of_ids = ray.get([f1.remote() for _ in range(10)])
set_of_ids = {tuple(gpu_ids) for gpu_ids in list_of_ids}
assert set_of_ids == {(i, ) for i in range(10)}
list_of_ids = ray.get([f2.remote(), f4.remote(), f4.remote()])
all_ids = [gpu_id for gpu_ids in list_of_ids for gpu_id in gpu_ids]
assert set(all_ids) == set(range(10))
# There are only 10 GPUs, and each task uses 5 GPUs, so there should only
# be 2 tasks scheduled at a given time.
t1 = time.time()
ray.get([f5.remote() for _ in range(20)])
assert time.time() - t1 >= 10 * 0.1
# Test that actors have CUDA_VISIBLE_DEVICES set properly.
@ray.remote
class Actor0(object):
def __init__(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 0
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
# Set self.x to make sure that we got here.
self.x = 1
def test(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 0
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
return self.x
@ray.remote(num_gpus=1)
class Actor1(object):
def __init__(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
# Set self.x to make sure that we got here.
self.x = 1
def test(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
return self.x
a0 = Actor0.remote()
ray.get(a0.test.remote())
a1 = Actor1.remote()
ray.get(a1.test.remote())
def test_zero_cpus(shutdown_only):
ray.init(num_cpus=0)
# We should be able to execute a task that requires 0 CPU resources.
@ray.remote(num_cpus=0)
def f():
return 1
ray.get(f.remote())
# We should be able to create an actor that requires 0 CPU resources.
@ray.remote(num_cpus=0)
class Actor(object):
def method(self):
pass
a = Actor.remote()
x = a.method.remote()
ray.get(x)
def test_zero_cpus_actor(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
cluster.add_node(num_cpus=2)
ray.init(address=cluster.address)
node_id = ray.worker.global_worker.node.unique_id
@ray.remote
class Foo(object):
def method(self):
return ray.worker.global_worker.node.unique_id
# Make sure tasks and actors run on the remote raylet.
a = Foo.remote()
assert ray.get(a.method.remote()) != node_id
def test_fractional_resources(shutdown_only):
ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1})
@ray.remote(num_gpus=0.5)
class Foo1(object):
def method(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
return gpu_ids[0]
foos = [Foo1.remote() for _ in range(6)]
gpu_ids = ray.get([f.method.remote() for f in foos])
for i in range(3):
assert gpu_ids.count(i) == 2
del foos
@ray.remote
class Foo2(object):
def method(self):
pass
# Create an actor that requires 0.7 of the custom resource.
f1 = Foo2._remote([], {}, resources={"Custom": 0.7})
ray.get(f1.method.remote())
# Make sure that we cannot create an actor that requires 0.7 of the
# custom resource. TODO(rkn): Re-enable this once ray.wait is
# implemented.
f2 = Foo2._remote([], {}, resources={"Custom": 0.7})
ready, _ = ray.wait([f2.method.remote()], timeout=0.5)
assert len(ready) == 0
# Make sure we can start an actor that requries only 0.3 of the custom
# resource.
f3 = Foo2._remote([], {}, resources={"Custom": 0.3})
ray.get(f3.method.remote())
del f1, f3
# Make sure that we get exceptions if we submit tasks that require a
# fractional number of resources greater than 1.
@ray.remote(num_cpus=1.5)
def test():
pass
with pytest.raises(ValueError):
test.remote()
with pytest.raises(ValueError):
Foo2._remote([], {}, resources={"Custom": 1.5})
def test_multiple_raylets(ray_start_cluster):
# This test will define a bunch of tasks that can only be assigned to
# specific raylets, and we will check that they are assigned
# to the correct raylets.
cluster = ray_start_cluster
cluster.add_node(num_cpus=11, num_gpus=0)
cluster.add_node(num_cpus=5, num_gpus=5)
cluster.add_node(num_cpus=10, num_gpus=1)
ray.init(address=cluster.address)
cluster.wait_for_nodes()
# Define a bunch of remote functions that all return the socket name of
# the plasma store. Since there is a one-to-one correspondence between
# plasma stores and raylets (at least right now), this can be
# used to identify which raylet the task was assigned to.
# This must be run on the zeroth raylet.
@ray.remote(num_cpus=11)
def run_on_0():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first raylet.
@ray.remote(num_gpus=2)
def run_on_1():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the second raylet.
@ray.remote(num_cpus=6, num_gpus=1)
def run_on_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This can be run anywhere.
@ray.remote(num_cpus=0, num_gpus=0)
def run_on_0_1_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first or second raylet.
@ray.remote(num_gpus=1)
def run_on_1_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the zeroth or second raylet.
@ray.remote(num_cpus=8)
def run_on_0_2():
return ray.worker.global_worker.node.plasma_store_socket_name
def run_lots_of_tasks():
names = []
results = []
for i in range(100):
index = np.random.randint(6)
if index == 0:
names.append("run_on_0")
results.append(run_on_0.remote())
elif index == 1:
names.append("run_on_1")
results.append(run_on_1.remote())
elif index == 2:
names.append("run_on_2")
results.append(run_on_2.remote())
elif index == 3:
names.append("run_on_0_1_2")
results.append(run_on_0_1_2.remote())
elif index == 4:
names.append("run_on_1_2")
results.append(run_on_1_2.remote())
elif index == 5:
names.append("run_on_0_2")
results.append(run_on_0_2.remote())
return names, results
client_table = ray.nodes()
store_names = []
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 0
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 5
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 1
]
assert len(store_names) == 3
def validate_names_and_results(names, results):
for name, result in zip(names, ray.get(results)):
if name == "run_on_0":
assert result in [store_names[0]]
elif name == "run_on_1":
assert result in [store_names[1]]
elif name == "run_on_2":
assert result in [store_names[2]]
elif name == "run_on_0_1_2":
assert (result in [
store_names[0], store_names[1], store_names[2]
])
elif name == "run_on_1_2":
assert result in [store_names[1], store_names[2]]
elif name == "run_on_0_2":
assert result in [store_names[0], store_names[2]]
else:
raise Exception("This should be unreachable.")
assert set(ray.get(results)) == set(store_names)
names, results = run_lots_of_tasks()
validate_names_and_results(names, results)
# Make sure the same thing works when this is nested inside of a task.
@ray.remote
def run_nested1():
names, results = run_lots_of_tasks()
return names, results
@ray.remote
def run_nested2():
names, results = ray.get(run_nested1.remote())
return names, results
names, results = ray.get(run_nested2.remote())
validate_names_and_results(names, results)
def test_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3, resources={"CustomResource": 0})
cluster.add_node(num_cpus=3, resources={"CustomResource": 1})
ray.init(address=cluster.address)
@ray.remote
def f():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def h():
ray.get([f.remote() for _ in range(5)])
return ray.worker.global_worker.node.unique_id
# The f tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(500)]))) == 2
node_id = ray.worker.global_worker.node.unique_id
# The g tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != node_id
# Make sure that resource bookkeeping works when a task that uses a
# custom resources gets blocked.
ray.get([h.remote() for _ in range(5)])
def test_node_id_resource(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
local_node = ray.state.current_node_id()
# Note that these will have the same IP in the test cluster
assert len(ray.state.node_ids()) == 2
assert local_node in ray.state.node_ids()
@ray.remote(resources={local_node: 1})
def f():
return ray.state.current_node_id()
# Check the node id resource is automatically usable for scheduling.
assert ray.get(f.remote()) == ray.state.current_node_id()
def test_two_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
num_cpus=3, resources={
"CustomResource1": 1,
"CustomResource2": 2
})
cluster.add_node(
num_cpus=3, resources={
"CustomResource1": 3,
"CustomResource2": 4
})
ray.init(address=cluster.address)
@ray.remote(resources={"CustomResource1": 1})
def f():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource2": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 1, "CustomResource2": 3})
def h():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 4})
def j():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource3": 1})
def k():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
# The f and g tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(500)]))) == 2
assert len(set(ray.get([g.remote() for _ in range(500)]))) == 2
node_id = ray.worker.global_worker.node.unique_id
# The h tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([h.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] != node_id
# Make sure that tasks with unsatisfied custom resource requirements do
# not get scheduled.
ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=0.5)
assert ready_ids == []
def test_many_custom_resources(shutdown_only):
num_custom_resources = 10000
total_resources = {
str(i): np.random.randint(1, 7)
for i in range(num_custom_resources)
}
ray.init(num_cpus=5, resources=total_resources)
def f():
return 1
remote_functions = []
for _ in range(20):
num_resources = np.random.randint(0, num_custom_resources + 1)
permuted_resources = np.random.permutation(
num_custom_resources)[:num_resources]
random_resources = {
str(i): total_resources[str(i)]
for i in permuted_resources
}
remote_function = ray.remote(resources=random_resources)(f)
remote_functions.append(remote_function)
remote_functions.append(ray.remote(f))
remote_functions.append(ray.remote(resources=total_resources)(f))
results = []
for remote_function in remote_functions:
results.append(remote_function.remote())
results.append(remote_function.remote())
results.append(remote_function.remote())
ray.get(results)
# TODO: 5 retry attempts may be too little for Travis and we may need to
# increase it if this test begins to be flaky on Travis.
def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})
def test():
resources = ray.available_resources()
MAX_RETRY_ATTEMPTS = 5
retry_count = 0
del resources["memory"]
del resources["object_store_memory"]
for key in list(resources.keys()):
if key.startswith("node:"):
del resources[key]
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
resources = ray.available_resources()
retry_count += 1
if retry_count >= MAX_RETRY_ATTEMPTS:
raise RuntimeError(
"Resources were available even after five retries.", resources)
return resources
function = ray.remote(
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
cluster_resources = ray.get(function.remote())
# All cluster resources should be utilized and
# cluster_resources must be empty
assert cluster_resources == {}
@pytest.fixture
def save_gpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
# reset it after the test.
original_gpu_ids = os.environ.get("CUDA_VISIBLE_DEVICES", None)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
# Reset the environment variable.
if original_gpu_ids is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = original_gpu_ids
else:
del os.environ["CUDA_VISIBLE_DEVICES"]
def test_specific_gpus(save_gpu_ids_shutdown_only):
allowed_gpu_ids = [4, 5, 6]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
[str(i) for i in allowed_gpu_ids])
ray.init(num_gpus=3)
@ray.remote(num_gpus=1)
def f():
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert gpu_ids[0] in allowed_gpu_ids
@ray.remote(num_gpus=2)
def g():
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 2
assert gpu_ids[0] in allowed_gpu_ids
assert gpu_ids[1] in allowed_gpu_ids
ray.get([f.remote() for _ in range(100)])
ray.get([g.remote() for _ in range(100)])
def test_blocking_tasks(ray_start_regular):
@ray.remote
def f(i, j):
return (i, j)
@ray.remote
def g(i):
# Each instance of g submits and blocks on the result of another
# remote task.
object_ids = [f.remote(i, j) for j in range(2)]
return ray.get(object_ids)
@ray.remote
def h(i):
# Each instance of g submits and blocks on the result of another
# remote task using ray.wait.
object_ids = [f.remote(i, j) for j in range(2)]
return ray.wait(object_ids, num_returns=len(object_ids))
ray.get([h.remote(i) for i in range(4)])
@ray.remote
def _sleep(i):
time.sleep(0.01)
return (i)
@ray.remote
def sleep():
# Each instance of sleep submits and blocks on the result of
# another remote task, which takes some time to execute.
ray.get([_sleep.remote(i) for i in range(10)])
ray.get(sleep.remote())
def test_max_call_tasks(ray_start_regular):
@ray.remote(max_calls=1)
def f():
return os.getpid()
pid = ray.get(f.remote())
ray.test_utils.wait_for_pid_to_exit(pid)
@ray.remote(max_calls=2)
def f():
return os.getpid()
pid1 = ray.get(f.remote())
pid2 = ray.get(f.remote())
assert pid1 == pid2
ray.test_utils.wait_for_pid_to_exit(pid1)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+731
View File
@@ -0,0 +1,731 @@
# coding: utf-8
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import glob
import logging
import os
import setproctitle
import shutil
import sys
import socket
import subprocess
import tempfile
import time
import numpy as np
import pickle
import pytest
import ray
from ray import signature
import ray.ray_constants as ray_constants
import ray.cluster_utils
import ray.test_utils
from ray.test_utils import RayTestTimeoutException
logger = logging.getLogger(__name__)
def attempt_to_load_balance(remote_function,
args,
total_tasks,
num_nodes,
minimum_count,
num_attempts=100):
attempts = 0
while attempts < num_attempts:
locations = ray.get(
[remote_function.remote(*args) for _ in range(total_tasks)])
names = set(locations)
counts = [locations.count(name) for name in names]
logger.info("Counts are {}.".format(counts))
if (len(names) == num_nodes
and all(count >= minimum_count for count in counts)):
break
attempts += 1
assert attempts < num_attempts
def test_load_balancing(ray_start_cluster):
# This test ensures that tasks are being assigned to all raylets
# in a roughly equal manner.
cluster = ray_start_cluster
num_nodes = 3
num_cpus = 7
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_cpus)
ray.init(address=cluster.address)
@ray.remote
def f():
time.sleep(0.01)
return ray.worker.global_worker.node.unique_id
attempt_to_load_balance(f, [], 100, num_nodes, 10)
attempt_to_load_balance(f, [], 1000, num_nodes, 100)
def test_load_balancing_with_dependencies(ray_start_cluster):
# This test ensures that tasks are being assigned to all raylets in a
# roughly equal manner even when the tasks have dependencies.
cluster = ray_start_cluster
num_nodes = 3
for _ in range(num_nodes):
cluster.add_node(num_cpus=1)
ray.init(address=cluster.address)
@ray.remote
def f(x):
time.sleep(0.010)
return ray.worker.global_worker.node.unique_id
# This object will be local to one of the raylets. Make sure
# this doesn't prevent tasks from being scheduled on other raylets.
x = ray.put(np.zeros(1000000))
attempt_to_load_balance(f, [x], 100, num_nodes, 25)
def wait_for_num_tasks(num_tasks, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
if len(ray.tasks()) >= num_tasks:
return
time.sleep(0.1)
raise RayTestTimeoutException("Timed out while waiting for global state.")
def wait_for_num_objects(num_objects, timeout=10):
start_time = time.time()
while time.time() - start_time < timeout:
if len(ray.objects()) >= num_objects:
return
time.sleep(0.1)
raise RayTestTimeoutException("Timed out while waiting for global state.")
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
@pytest.mark.skipif(
ray_constants.direct_call_enabled(), reason="state API not supported")
def test_global_state_api(shutdown_only):
error_message = ("The ray global state API cannot be used "
"before ray.init has been called.")
with pytest.raises(Exception, match=error_message):
ray.objects()
with pytest.raises(Exception, match=error_message):
ray.tasks()
with pytest.raises(Exception, match=error_message):
ray.nodes()
with pytest.raises(Exception, match=error_message):
ray.jobs()
ray.init(num_cpus=5, num_gpus=3, resources={"CustomResource": 1})
assert ray.cluster_resources()["CPU"] == 5
assert ray.cluster_resources()["GPU"] == 3
assert ray.cluster_resources()["CustomResource"] == 1
assert ray.objects() == {}
job_id = ray.utils.compute_job_id_from_driver(
ray.WorkerID(ray.worker.global_worker.worker_id))
driver_task_id = ray.worker.global_worker.current_task_id.hex()
# One task is put in the task table which corresponds to this driver.
wait_for_num_tasks(1)
task_table = ray.tasks()
assert len(task_table) == 1
assert driver_task_id == list(task_table.keys())[0]
task_spec = task_table[driver_task_id]["TaskSpec"]
nil_unique_id_hex = ray.UniqueID.nil().hex()
nil_actor_id_hex = ray.ActorID.nil().hex()
assert task_spec["TaskID"] == driver_task_id
assert task_spec["ActorID"] == nil_actor_id_hex
assert task_spec["Args"] == []
assert task_spec["JobID"] == job_id.hex()
assert task_spec["FunctionID"] == nil_unique_id_hex
assert task_spec["ReturnObjectIDs"] == []
client_table = ray.nodes()
node_ip_address = ray.worker.global_worker.node_ip_address
assert len(client_table) == 1
assert client_table[0]["NodeManagerAddress"] == node_ip_address
@ray.remote
def f(*xs):
return 1
x_id = ray.put(1)
result_id = f.remote(1, "hi", x_id)
# Wait for one additional task to complete.
wait_for_num_tasks(1 + 1)
task_table = ray.tasks()
assert len(task_table) == 1 + 1
task_id_set = set(task_table.keys())
task_id_set.remove(driver_task_id)
task_id = list(task_id_set)[0]
task_spec = task_table[task_id]["TaskSpec"]
assert task_spec["ActorID"] == nil_actor_id_hex
assert task_spec["Args"] == [
signature.DUMMY_TYPE, 1, signature.DUMMY_TYPE, "hi",
signature.DUMMY_TYPE, x_id
]
assert task_spec["JobID"] == job_id.hex()
assert task_spec["ReturnObjectIDs"] == [result_id]
assert task_table[task_id] == ray.tasks(task_id)
# Wait for two objects, one for the x_id and one for result_id.
wait_for_num_objects(2)
def wait_for_object_table():
timeout = 10
start_time = time.time()
while time.time() - start_time < timeout:
object_table = ray.objects()
tables_ready = (object_table[x_id]["ManagerIDs"] is not None and
object_table[result_id]["ManagerIDs"] is not None)
if tables_ready:
return
time.sleep(0.1)
raise RayTestTimeoutException(
"Timed out while waiting for object table to "
"update.")
object_table = ray.objects()
assert len(object_table) == 2
assert object_table[x_id] == ray.objects(x_id)
object_table_entry = ray.objects(result_id)
assert object_table[result_id] == object_table_entry
job_table = ray.jobs()
assert len(job_table) == 1
assert job_table[0]["JobID"] == job_id.hex()
assert job_table[0]["NodeManagerAddress"] == node_ip_address
# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we
# should use those, but they seem to conflict with Ray's use of faulthandler.
class CaptureOutputAndError(object):
"""Capture stdout and stderr of some span.
This can be used as follows.
captured = {}
with CaptureOutputAndError(captured):
# Do stuff.
# Access captured["out"] and captured["err"].
"""
def __init__(self, captured_output_and_error):
if sys.version_info >= (3, 0):
import io
self.output_buffer = io.StringIO()
self.error_buffer = io.StringIO()
else:
import cStringIO
self.output_buffer = cStringIO.StringIO()
self.error_buffer = cStringIO.StringIO()
self.captured_output_and_error = captured_output_and_error
def __enter__(self):
sys.stdout.flush()
sys.stderr.flush()
self.old_stdout = sys.stdout
self.old_stderr = sys.stderr
sys.stdout = self.output_buffer
sys.stderr = self.error_buffer
def __exit__(self, exc_type, exc_value, traceback):
sys.stdout.flush()
sys.stderr.flush()
sys.stdout = self.old_stdout
sys.stderr = self.old_stderr
self.captured_output_and_error["out"] = self.output_buffer.getvalue()
self.captured_output_and_error["err"] = self.error_buffer.getvalue()
def test_logging_to_driver(shutdown_only):
ray.init(num_cpus=1, log_to_driver=True)
@ray.remote
def f():
# It's important to make sure that these print statements occur even
# without calling sys.stdout.flush() and sys.stderr.flush().
for i in range(100):
print(i)
print(100 + i, file=sys.stderr)
captured = {}
with CaptureOutputAndError(captured):
ray.get(f.remote())
time.sleep(1)
output_lines = captured["out"]
for i in range(200):
assert str(i) in output_lines
# TODO(rkn): Check that no additional logs appear beyond what we expect
# and that there are no duplicate logs. Once we address the issue
# described in https://github.com/ray-project/ray/pull/5462, we should
# also check that nothing is logged to stderr.
def test_not_logging_to_driver(shutdown_only):
ray.init(num_cpus=1, log_to_driver=False)
@ray.remote
def f():
for i in range(100):
print(i)
print(100 + i, file=sys.stderr)
sys.stdout.flush()
sys.stderr.flush()
captured = {}
with CaptureOutputAndError(captured):
ray.get(f.remote())
time.sleep(1)
output_lines = captured["out"]
assert len(output_lines) == 0
# TODO(rkn): Check that no additional logs appear beyond what we expect
# and that there are no duplicate logs. Once we address the issue
# described in https://github.com/ray-project/ray/pull/5462, we should
# also check that nothing is logged to stderr.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="New GCS API doesn't have a Python API yet.")
def test_workers(shutdown_only):
num_workers = 3
ray.init(num_cpus=num_workers)
@ray.remote
def f():
return id(ray.worker.global_worker), os.getpid()
# Wait until all of the workers have started.
worker_ids = set()
while len(worker_ids) != num_workers:
worker_ids = set(ray.get([f.remote() for _ in range(10)]))
def test_specific_job_id():
dummy_driver_id = ray.JobID.from_int(1)
ray.init(num_cpus=1, job_id=dummy_driver_id)
# in driver
assert dummy_driver_id == ray._get_runtime_context().current_driver_id
# in worker
@ray.remote
def f():
return ray._get_runtime_context().current_driver_id
assert dummy_driver_id == ray.get(f.remote())
ray.shutdown()
def test_object_id_properties():
id_bytes = b"00112233445566778899"
object_id = ray.ObjectID(id_bytes)
assert object_id.binary() == id_bytes
object_id = ray.ObjectID.nil()
assert object_id.is_nil()
with pytest.raises(ValueError, match=r".*needs to have length 20.*"):
ray.ObjectID(id_bytes + b"1234")
with pytest.raises(ValueError, match=r".*needs to have length 20.*"):
ray.ObjectID(b"0123456789")
object_id = ray.ObjectID.from_random()
assert not object_id.is_nil()
assert object_id.binary() != id_bytes
id_dumps = pickle.dumps(object_id)
id_from_dumps = pickle.loads(id_dumps)
assert id_from_dumps == object_id
@pytest.fixture
def shutdown_only_with_initialization_check():
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
assert not ray.is_initialized()
def test_initialized(shutdown_only_with_initialization_check):
assert not ray.is_initialized()
ray.init(num_cpus=0)
assert ray.is_initialized()
def test_initialized_local_mode(shutdown_only_with_initialization_check):
assert not ray.is_initialized()
ray.init(num_cpus=0, local_mode=True)
assert ray.is_initialized()
def test_wait_reconstruction(shutdown_only):
ray.init(num_cpus=1, object_store_memory=int(10**8))
@ray.remote
def f():
return np.zeros(6 * 10**7, dtype=np.uint8)
x_id = f.remote()
ray.wait([x_id])
ray.wait([f.remote()])
assert not ray.worker.global_worker.core_worker.object_exists(x_id)
ready_ids, _ = ray.wait([x_id])
assert len(ready_ids) == 1
def test_ray_setproctitle(ray_start_2_cpus):
@ray.remote
class UniqueName(object):
def __init__(self):
assert setproctitle.getproctitle() == "ray::UniqueName.__init__()"
def f(self):
assert setproctitle.getproctitle() == "ray::UniqueName.f()"
@ray.remote
def unique_1():
assert "unique_1" in setproctitle.getproctitle()
actor = UniqueName.remote()
ray.get(actor.f.remote())
ray.get(unique_1.remote())
def test_duplicate_error_messages(shutdown_only):
ray.init(num_cpus=0)
driver_id = ray.WorkerID.nil()
error_data = ray.gcs_utils.construct_error_message(driver_id, "test",
"message", 0)
# Push the same message to the GCS twice (they are the same because we
# do not include a timestamp).
r = ray.worker.global_worker.redis_client
r.execute_command("RAY.TABLE_APPEND",
ray.gcs_utils.TablePrefix.Value("ERROR_INFO"),
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"),
driver_id.binary(), error_data)
# Before https://github.com/ray-project/ray/pull/3316 this would
# give an error
r.execute_command("RAY.TABLE_APPEND",
ray.gcs_utils.TablePrefix.Value("ERROR_INFO"),
ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB"),
driver_id.binary(), error_data)
@pytest.mark.skipif(
os.getenv("TRAVIS") is None,
reason="This test should only be run on Travis.")
def test_ray_stack(ray_start_2_cpus):
def unique_name_1():
time.sleep(1000)
@ray.remote
def unique_name_2():
time.sleep(1000)
@ray.remote
def unique_name_3():
unique_name_1()
unique_name_2.remote()
unique_name_3.remote()
success = False
start_time = time.time()
while time.time() - start_time < 30:
# Attempt to parse the "ray stack" call.
output = ray.utils.decode(subprocess.check_output(["ray", "stack"]))
if ("unique_name_1" in output and "unique_name_2" in output
and "unique_name_3" in output):
success = True
break
if not success:
raise Exception("Failed to find necessary information with "
"'ray stack'")
def test_pandas_parquet_serialization():
# Only test this if pandas is installed
pytest.importorskip("pandas")
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
tempdir = tempfile.mkdtemp()
filename = os.path.join(tempdir, "parquet-test")
pd.DataFrame({"col1": [0, 1], "col2": [0, 1]}).to_parquet(filename)
with open(os.path.join(tempdir, "parquet-compression"), "wb") as f:
table = pa.Table.from_arrays([pa.array([1, 2, 3])], ["hello"])
pq.write_table(table, f, compression="lz4")
# Clean up
shutil.rmtree(tempdir)
def test_socket_dir_not_existing(shutdown_only):
random_name = ray.ObjectID.from_random().hex()
temp_raylet_socket_dir = "/tmp/ray/tests/{}".format(random_name)
temp_raylet_socket_name = os.path.join(temp_raylet_socket_dir,
"raylet_socket")
ray.init(num_cpus=1, raylet_socket_name=temp_raylet_socket_name)
def test_raylet_is_robust_to_random_messages(ray_start_regular):
node_manager_address = None
node_manager_port = None
for client in ray.nodes():
if "NodeManagerAddress" in client:
node_manager_address = client["NodeManagerAddress"]
node_manager_port = client["NodeManagerPort"]
assert node_manager_address
assert node_manager_port
# Try to bring down the node manager:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((node_manager_address, node_manager_port))
s.send(1000 * b"asdf")
@ray.remote
def f():
return 1
assert ray.get(f.remote()) == 1
def test_non_ascii_comment(ray_start_regular):
@ray.remote
def f():
# 日本語 Japanese comment
return 1
assert ray.get(f.remote()) == 1
def test_shutdown_disconnect_global_state():
ray.init(num_cpus=0)
ray.shutdown()
with pytest.raises(Exception) as e:
ray.objects()
assert str(e.value).endswith("ray.init has been called.")
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_put_pins_object(ray_start_object_store_memory):
x_id = ray.put("HI")
x_copy = ray.ObjectID(x_id.binary())
assert ray.get(x_copy) == "HI"
# x cannot be evicted since x_id pins it
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
assert ray.get(x_id) == "HI"
assert ray.get(x_copy) == "HI"
# now it can be evicted since x_id pins it but x_copy does not
del x_id
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(x_copy)
# weakref put
y_id = ray.put("HI", weakref=True)
for _ in range(10):
ray.put(np.zeros(10 * 1024 * 1024))
with pytest.raises(ray.exceptions.UnreconstructableError):
ray.get(y_id)
@ray.remote
def check_no_buffer_ref(x):
assert x[0].get_buffer_ref() is None
z_id = ray.put("HI")
assert z_id.get_buffer_ref() is not None
ray.get(check_no_buffer_ref.remote([z_id]))
@pytest.mark.parametrize(
"ray_start_object_store_memory", [150 * 1024 * 1024], indirect=True)
def test_redis_lru_with_set(ray_start_object_store_memory):
x = np.zeros(8 * 10**7, dtype=np.uint8)
x_id = ray.put(x, weakref=True)
# Remove the object from the object table to simulate Redis LRU eviction.
removed = False
start_time = time.time()
while time.time() < start_time + 10:
if ray.state.state.redis_clients[0].delete(b"OBJECT" +
x_id.binary()) == 1:
removed = True
break
assert removed
# Now evict the object from the object store.
ray.put(x) # This should not crash.
def test_decorated_function(ray_start_regular):
def function_invocation_decorator(f):
def new_f(args, kwargs):
# Reverse the arguments.
return f(args[::-1], {"d": 5}), kwargs
return new_f
def f(a, b, c, d=None):
return a, b, c, d
f.__ray_invocation_decorator__ = function_invocation_decorator
f = ray.remote(f)
result_id, kwargs = f.remote(1, 2, 3, d=4)
assert kwargs == {"d": 4}
assert ray.get(result_id) == (3, 2, 1, 5)
def test_get_postprocess(ray_start_regular):
def get_postprocessor(object_ids, values):
return [value for value in values if value > 0]
ray.worker.global_worker._post_get_hooks.append(get_postprocessor)
assert ray.get(
[ray.put(i) for i in [0, 1, 3, 5, -1, -3, 4]]) == [1, 3, 5, 4]
def test_export_after_shutdown(ray_start_regular):
# This test checks that we can use actor and remote function definitions
# across multiple Ray sessions.
@ray.remote
def f():
pass
@ray.remote
class Actor(object):
def method(self):
pass
ray.get(f.remote())
a = Actor.remote()
ray.get(a.method.remote())
ray.shutdown()
# Start Ray and use the remote function and actor again.
ray.init(num_cpus=1)
ray.get(f.remote())
a = Actor.remote()
ray.get(a.method.remote())
ray.shutdown()
# Start Ray again and make sure that these definitions can be exported from
# workers.
ray.init(num_cpus=2)
@ray.remote
def export_definitions_from_worker(remote_function, actor_class):
ray.get(remote_function.remote())
actor_handle = actor_class.remote()
ray.get(actor_handle.method.remote())
ray.get(export_definitions_from_worker.remote(f, Actor))
def test_invalid_unicode_in_worker_log(shutdown_only):
info = ray.init(num_cpus=1)
logs_dir = os.path.join(info["session_dir"], "logs")
# Wait till first worker log file is created.
while True:
log_file_paths = glob.glob("{}/worker*.out".format(logs_dir))
if len(log_file_paths) == 0:
time.sleep(0.2)
else:
break
with open(log_file_paths[0], "wb") as f:
f.write(b"\xe5abc\nline2\nline3\n")
f.write(b"\xe5abc\nline2\nline3\n")
f.write(b"\xe5abc\nline2\nline3\n")
f.flush()
# Wait till the log monitor reads the file.
time.sleep(1.0)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
@pytest.mark.skip(reason="This test is too expensive to run.")
def test_move_log_files_to_old(shutdown_only):
info = ray.init(num_cpus=1)
logs_dir = os.path.join(info["session_dir"], "logs")
@ray.remote
class Actor(object):
def f(self):
print("function f finished")
# First create a temporary actor.
actors = [
Actor.remote() for i in range(ray_constants.LOG_MONITOR_MAX_OPEN_FILES)
]
ray.get([a.f.remote() for a in actors])
# Make sure no log files are in the "old" directory before the actors
# are killed.
assert len(glob.glob("{}/old/worker*.out".format(logs_dir))) == 0
# Now kill the actors so the files get moved to logs/old/.
[a.__ray_terminate__.remote() for a in actors]
while True:
log_file_paths = glob.glob("{}/old/worker*.out".format(logs_dir))
if len(log_file_paths) > 0:
with open(log_file_paths[0], "r") as f:
assert "function f finished\n" in f.readlines()
break
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+5 -261
View File
@@ -2,20 +2,15 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import signal
import sys
import time
import numpy as np
import pytest
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import (run_string_as_driver_nonblocking,
RayTestTimeoutException)
from ray.test_utils import run_string_as_driver_nonblocking
# This test checks that when a worker dies in the middle of a get, the plasma
@@ -59,7 +54,7 @@ def test_dying_worker_get(ray_start_2_cpus):
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id)
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
time.sleep(0.1)
# Make sure that nothing has died.
@@ -102,7 +97,7 @@ ray.get(ray.ObjectID(ray.utils.hex_to_binary("{}")))
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# get has been fulfilled.
ray.worker.global_worker.put_object(1, x_id)
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
time.sleep(0.1)
# Make sure that nothing has died.
@@ -142,7 +137,7 @@ def test_dying_worker_wait(ray_start_2_cpus):
time.sleep(0.1)
# Create the object.
ray.worker.global_worker.put_object(1, x_id)
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
time.sleep(0.1)
# Make sure that nothing has died.
@@ -185,264 +180,13 @@ ray.wait([ray.ObjectID(ray.utils.hex_to_binary("{}"))])
assert len(ready_ids) == 0
# Seal the object so the store attempts to notify the worker that the
# wait can return.
ray.worker.global_worker.put_object(1, x_id)
ray.worker.global_worker.put_object(1, x_id.with_plasma_transport_type())
time.sleep(0.1)
# Make sure that nothing has died.
assert ray.services.remaining_processes_alive()
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_nodes = request.param[0]
num_initial_workers = request.param[1]
# Start the Ray processes.
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_initial_workers)
ray.init(address=cluster.address)
yield num_nodes, num_initial_workers
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_worker_failed(ray_start_workers_separate_multinode):
num_nodes, num_initial_workers = (ray_start_workers_separate_multinode)
@ray.remote
def get_pids():
time.sleep(0.25)
return os.getpid()
start_time = time.time()
pids = set()
while len(pids) < num_nodes * num_initial_workers:
new_pids = ray.get([
get_pids.remote()
for _ in range(2 * num_nodes * num_initial_workers)
])
for pid in new_pids:
pids.add(pid)
if time.time() - start_time > 60:
raise RayTestTimeoutException(
"Timed out while waiting to get worker PIDs.")
@ray.remote
def f(x):
time.sleep(0.5)
return x
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_ids += [f.remote(object_id) for object_id in object_ids]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
for pid in pids:
os.kill(pid, signal.SIGKILL)
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
for object_id in object_ids:
try:
ray.get(object_id)
except (ray.exceptions.RayTaskError, ray.exceptions.RayWorkerError):
pass
def _test_component_failed(cluster, component_type):
"""Kill a component on all worker nodes and check workload succeeds."""
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
return x
@ray.remote
def g(*xs):
return 1
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
x = 1
for _ in range(1000):
x = f.remote(x)
xs = [g.remote(1)]
for _ in range(100):
xs.append(g.remote(*xs))
xs.append(g.remote(1))
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
ray.get(x)
ray.get(xs)
def check_components_alive(cluster, component_type, check_component_alive):
"""Check that a given component type is alive on all worker nodes."""
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
if check_component_alive:
assert process.poll() is None
else:
print("waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
process.wait()
print("done waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
assert not process.poll() is None
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_raylet_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all raylets on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET)
# The plasma stores should still be alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
True)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 2,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
False)
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
"num_nodes": 3,
"do_init": True
}],
indirect=True)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
# Children actors will die about half the time.
death_probability = 0.5
children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(2):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait a while for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_raylet_monitor()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR])
assert len(process_infos) == 5
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,183 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import signal
import sys
import time
import pytest
import ray
import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import RayTestTimeoutException
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
num_nodes = request.param[0]
num_initial_workers = request.param[1]
# Start the Ray processes.
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_initial_workers)
ray.init(address=cluster.address)
yield num_nodes, num_initial_workers
# The code after the yield will run as teardown code.
ray.shutdown()
cluster.shutdown()
def test_worker_failed(ray_start_workers_separate_multinode):
num_nodes, num_initial_workers = (ray_start_workers_separate_multinode)
@ray.remote
def get_pids():
time.sleep(0.25)
return os.getpid()
start_time = time.time()
pids = set()
while len(pids) < num_nodes * num_initial_workers:
new_pids = ray.get([
get_pids.remote()
for _ in range(2 * num_nodes * num_initial_workers)
])
for pid in new_pids:
pids.add(pid)
if time.time() - start_time > 60:
raise RayTestTimeoutException(
"Timed out while waiting to get worker PIDs.")
@ray.remote
def f(x):
time.sleep(0.5)
return x
# Submit more tasks than there are workers so that all workers and
# cores are utilized.
object_ids = [f.remote(i) for i in range(num_initial_workers * num_nodes)]
object_ids += [f.remote(object_id) for object_id in object_ids]
# Allow the tasks some time to begin executing.
time.sleep(0.1)
# Kill the workers as the tasks execute.
for pid in pids:
os.kill(pid, signal.SIGKILL)
time.sleep(0.1)
# Make sure that we either get the object or we get an appropriate
# exception.
for object_id in object_ids:
try:
ray.get(object_id)
except (ray.exceptions.RayTaskError, ray.exceptions.RayWorkerError):
pass
def _test_component_failed(cluster, component_type):
"""Kill a component on all worker nodes and check workload succeeds."""
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
return x
@ray.remote
def g(*xs):
return 1
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
x = 1
for _ in range(1000):
x = f.remote(x)
xs = [g.remote(1)]
for _ in range(100):
xs.append(g.remote(*xs))
xs.append(g.remote(1))
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
ray.get(x)
ray.get(xs)
def check_components_alive(cluster, component_type, check_component_alive):
"""Check that a given component type is alive on all worker nodes."""
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
for node in worker_nodes:
process = node.all_processes[component_type][0].process
if check_component_alive:
assert process.poll() is None
else:
print("waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
process.wait()
print("done waiting for " + component_type + " with PID " +
str(process.pid) + "to terminate")
assert not process.poll() is None
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 4,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_raylet_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all raylets on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_RAYLET)
# The plasma stores should still be alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
True)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 8,
"num_nodes": 2,
"_internal_config": json.dumps({
"num_heartbeats_timeout": 100
}),
}],
indirect=True)
def test_plasma_store_failed(ray_start_cluster):
cluster = ray_start_cluster
# Kill all plasma stores on worker nodes.
_test_component_failed(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE)
# No processes should be left alive on the worker nodes.
check_components_alive(cluster, ray_constants.PROCESS_TYPE_PLASMA_STORE,
False)
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,107 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import sys
import time
import numpy as np
import pytest
import ray
import ray.ray_constants as ray_constants
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
"num_nodes": 3,
"do_init": True
}],
indirect=True)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
# Children actors will die about half the time.
death_probability = 0.5
children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(2):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait a while for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_raylet_monitor()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR])
assert len(process_infos) == 5
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+2 -2
View File
@@ -23,7 +23,7 @@ from ray.test_utils import (
RayTestTimeoutException,
)
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
def test_failed_task(ray_start_regular):
@@ -117,7 +117,7 @@ def temporary_helper_function():
wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2)
errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
assert len(errors) == 2
assert len(errors) >= 2, errors
assert "No module named" in errors[0]["message"]
assert "No module named" in errors[1]["message"]
+1 -1
View File
@@ -15,7 +15,7 @@ class LightActor(object):
pass
def sample(self):
return "tiny_return_value"
return np.zeros(1 * MB, dtype=np.uint8)
@ray.remote
+4
View File
@@ -8,6 +8,7 @@ import subprocess
import time
import ray
from ray import ray_constants
from ray.test_utils import (
RayTestTimeoutException,
run_string_as_driver,
@@ -483,6 +484,9 @@ print("success")
assert "success" in out
@pytest.mark.skipif(
ray_constants.direct_call_enabled(),
reason="fate sharing not implemented yet")
def test_driver_exiting_when_worker_blocked(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
-37
View File
@@ -154,43 +154,6 @@ def test_heartbeats_single(ray_start_cluster_head):
ray.get(work_handle)
@pytest.mark.flaky(reruns=4)
def test_heartbeats_cluster(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.
Test proper metrics.
"""
cluster = ray_start_cluster_head
timeout = 8
num_workers_nodes = 3
num_nodes_total = int(num_workers_nodes + 1)
[cluster.add_node() for i in range(num_workers_nodes)]
cluster.wait_for_nodes()
monitor = setup_monitor(cluster.address)
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
@ray.remote
class Actor(object):
def work(self, timeout):
time.sleep(timeout)
return True
test_actors = [Actor.remote() for i in range(num_nodes_total)]
work_handles = [actor.work.remote(timeout * 2) for actor in test_actors]
verify_load_metrics(monitor, (num_nodes_total, {
"CPU": num_nodes_total
}, {
"CPU": num_nodes_total
}))
ray.get(work_handles)
verify_load_metrics(monitor, (0.0, {"CPU": 0.0}, {"CPU": num_nodes_total}))
ray.shutdown()
def test_wait_for_nodes(ray_start_cluster_head):
"""Unit test for `Cluster.wait_for_nodes`.
+1 -158
View File
@@ -8,7 +8,6 @@ import signal
import sys
import time
import numpy as np
import pytest
import ray
@@ -16,7 +15,7 @@ import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import RayTestTimeoutException
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
@pytest.fixture(params=[(1, 4), (4, 4)])
@@ -170,72 +169,6 @@ def test_raylet_failed(ray_start_cluster):
True)
@pytest.mark.skipif(
RAY_FORCE_DIRECT,
reason="No reconstruction for objects placed in plasma yet")
@pytest.mark.parametrize(
"ray_start_cluster",
[{
# Force at least one task per node.
"num_cpus": 1,
"num_nodes": 4,
"object_store_memory": 1000 * 1024 * 1024,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100,
"object_manager_pull_timeout_ms": 1000,
"object_manager_push_timeout_ms": 1000,
"object_manager_repeated_push_delay_ms": 1000,
}),
}],
indirect=True)
def test_object_reconstruction(ray_start_cluster):
cluster = ray_start_cluster
# Submit tasks with dependencies in plasma.
@ray.remote
def large_value():
# Sleep for a bit to force tasks onto different nodes.
time.sleep(0.1)
return np.zeros(10 * 1024 * 1024)
@ray.remote
def g(x):
return
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
component_type = ray_constants.PROCESS_TYPE_RAYLET
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
num_tasks = len(worker_nodes)
xs = [large_value.remote() for _ in range(num_tasks)]
# Wait for the tasks to complete, then evict the objects from the local
# node.
for x in xs:
ray.get(x)
ray.internal.free([x], local_only=True)
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
print("F", xs)
xs = [g.remote(x) for x in xs]
print("G", xs)
ray.get(xs)
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@@ -261,96 +194,6 @@ def test_plasma_store_failed(ray_start_cluster):
check_components_alive(cluster, ray_constants.PROCESS_TYPE_RAYLET, False)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor restart yet")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
"num_nodes": 3,
"do_init": True
}],
indirect=True)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
# Children actors will die about half the time.
death_probability = 0.5
children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(2):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait a while for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_raylet_monitor()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR])
assert len(process_infos) == 5
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,177 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import json
import os
import sys
import time
import numpy as np
import pytest
import ray
import ray.ray_constants as ray_constants
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
@pytest.mark.skipif(
RAY_FORCE_DIRECT,
reason="No reconstruction for objects placed in plasma yet")
@pytest.mark.parametrize(
"ray_start_cluster",
[{
# Force at least one task per node.
"num_cpus": 1,
"num_nodes": 4,
"object_store_memory": 1000 * 1024 * 1024,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100,
"object_manager_pull_timeout_ms": 1000,
"object_manager_push_timeout_ms": 1000,
"object_manager_repeated_push_delay_ms": 1000,
}),
}],
indirect=True)
def test_object_reconstruction(ray_start_cluster):
cluster = ray_start_cluster
# Submit tasks with dependencies in plasma.
@ray.remote
def large_value():
# Sleep for a bit to force tasks onto different nodes.
time.sleep(0.1)
return np.zeros(10 * 1024 * 1024)
@ray.remote
def g(x):
return
# Kill the component on all nodes except the head node as the tasks
# execute. Do this in a loop while submitting tasks between each
# component failure.
time.sleep(0.1)
worker_nodes = cluster.list_all_nodes()[1:]
assert len(worker_nodes) > 0
component_type = ray_constants.PROCESS_TYPE_RAYLET
for node in worker_nodes:
process = node.all_processes[component_type][0].process
# Submit a round of tasks with many dependencies.
num_tasks = len(worker_nodes)
xs = [large_value.remote() for _ in range(num_tasks)]
# Wait for the tasks to complete, then evict the objects from the local
# node.
for x in xs:
ray.get(x)
ray.internal.free([x], local_only=True)
# Kill a component on one of the nodes.
process.terminate()
time.sleep(1)
process.kill()
process.wait()
assert not process.poll() is None
# Make sure that we can still get the objects after the
# executing tasks died.
print("F", xs)
xs = [g.remote(x) for x in xs]
print("G", xs)
ray.get(xs)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor restart yet")
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 4,
"num_nodes": 3,
"do_init": True
}],
indirect=True)
def test_actor_creation_node_failure(ray_start_cluster):
# TODO(swang): Refactor test_raylet_failed, etc to reuse the below code.
cluster = ray_start_cluster
@ray.remote
class Child(object):
def __init__(self, death_probability):
self.death_probability = death_probability
def ping(self):
# Exit process with some probability.
exit_chance = np.random.rand()
if exit_chance < self.death_probability:
sys.exit(-1)
num_children = 50
# Children actors will die about half the time.
death_probability = 0.5
children = [Child.remote(death_probability) for _ in range(num_children)]
while len(cluster.list_all_nodes()) > 1:
for j in range(2):
# Submit some tasks on the actors. About half of the actors will
# fail.
children_out = [child.ping.remote() for child in children]
# Wait a while for all the tasks to complete. This should trigger
# reconstruction for any actor creation tasks that were forwarded
# to nodes that then failed.
ready, _ = ray.wait(
children_out, num_returns=len(children_out), timeout=5 * 60.0)
assert len(ready) == len(children_out)
# Replace any actors that died.
for i, out in enumerate(children_out):
try:
ray.get(out)
except ray.exceptions.RayActorError:
children[i] = Child.remote(death_probability)
# Remove a node. Any actor creation tasks that were forwarded to this
# node must be reconstructed.
cluster.remove_node(cluster.list_all_nodes()[-1])
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_sequential(ray_start_regular):
ray.worker._global_node.kill_raylet()
ray.worker._global_node.kill_plasma_store()
ray.worker._global_node.kill_log_monitor()
ray.worker._global_node.kill_monitor()
ray.worker._global_node.kill_raylet_monitor()
# If the driver can reach the tearDown method, then it is still alive.
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
def test_driver_lives_parallel(ray_start_regular):
all_processes = ray.worker._global_node.all_processes
process_infos = (all_processes[ray_constants.PROCESS_TYPE_PLASMA_STORE] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET] +
all_processes[ray_constants.PROCESS_TYPE_LOG_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_MONITOR] +
all_processes[ray_constants.PROCESS_TYPE_RAYLET_MONITOR])
assert len(process_infos) == 5
# Kill all the components in parallel.
for process_info in process_infos:
process_info.process.terminate()
time.sleep(0.1)
for process_info in process_infos:
process_info.process.kill()
for process_info in process_infos:
process_info.process.wait()
# If the driver can reach the tearDown method, then it is still alive.
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))
+3 -1
View File
@@ -11,6 +11,7 @@ import time
import warnings
import ray
from ray import ray_constants
from ray.cluster_utils import Cluster
# TODO(yuhguo): This test file requires a lot of CPU/memory, and
@@ -45,6 +46,7 @@ def ray_start_cluster_with_resource():
# This test is here to make sure that when we broadcast an object to a bunch of
# machines, we don't have too many excess object transfers.
@pytest.mark.skipif(ray_constants.direct_call_enabled(), reason="TODO(ekl)")
def test_object_broadcast(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource
@@ -235,7 +237,7 @@ def test_object_transfer_retry(ray_start_cluster):
# Transfer an object to warm up the object manager.
ray.get(f.remote(10**6))
x_ids = [f.remote(10**i) for i in [1, 2, 3, 4]]
x_ids = [f.remote(10**i) for i in [6]]
assert not any(
ray.worker.global_worker.core_worker.object_exists(x_id)
for x_id in x_ids)
+5
View File
@@ -1,5 +1,7 @@
import pytest
import time
from ray import ray_constants
import ray
import ray.experimental.signal as signal
@@ -274,6 +276,9 @@ def test_forget(ray_start_regular):
assert len(result_list) == count
@pytest.mark.skipif(
ray_constants.direct_call_enabled(),
reason="TODO(ekl): this requires reconstruction")
def test_signal_on_node_failure(two_node_cluster):
"""Test actor checkpointing on a remote node."""
@@ -6,6 +6,7 @@ import numpy as np
import unittest
import ray
from ray import ray_constants
class TestUnreconstructableErrors(unittest.TestCase):
@@ -27,6 +28,9 @@ class TestUnreconstructableErrors(unittest.TestCase):
lambda: ray.get(x_id))
def testLineageEvictedReconstructionFails(self):
if ray_constants.direct_call_enabled():
return # not relevant
@ray.remote
def f(data):
return 0
+24 -2
View File
@@ -50,12 +50,18 @@ py_test(
deps = [":tune_lib"],
)
py_test(
name = "test_experiment_analysis_mem",
size = "small",
srcs = ["tests/test_experiment_analysis_mem.py"],
deps = [":tune_lib"],
)
py_test(
name = "test_experiment",
size = "small",
srcs = ["tests/test_experiment.py"],
deps = [":tune_lib"],
flaky = 1,
)
py_test(
@@ -96,6 +102,22 @@ py_test(
tags = ["exclusive"],
)
py_test(
name = "test_trial_runner_2",
size = "medium",
srcs = ["tests/test_trial_runner_2.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
)
py_test(
name = "test_trial_runner_3",
size = "medium",
srcs = ["tests/test_trial_runner_3.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
)
py_test(
name = "test_var",
size = "small",
@@ -146,7 +168,7 @@ py_test(
py_test(
name = "test_tune_server",
size = "medium",
size = "small",
srcs = ["tests/test_tune_server.py"],
deps = [":tune_lib"],
tags = ["exclusive"],
+10 -8
View File
@@ -123,6 +123,8 @@ def test_trial_processed_after_node_failure(start_connected_emptyhead_cluster):
cluster.remove_node(node)
runner.step()
if not mock_process_failure.called:
runner.step()
assert mock_process_failure.called
@@ -259,11 +261,9 @@ def test_trial_migration(start_connected_emptyhead_cluster):
cluster.remove_node(node2)
cluster.wait_for_nodes()
runner.step() # Recovery step
assert t2.last_result["training_iteration"] == 2
for i in range(1):
if t2.status != Trial.TERMINATED:
runner.step()
assert t2.status == Trial.TERMINATED
assert t2.status == Trial.TERMINATED, runner.debug_string()
# Test recovery of trial that won't be checkpointed
t3 = Trial("__fake", **{"stopping_criterion": {"training_iteration": 3}})
@@ -274,7 +274,9 @@ def test_trial_migration(start_connected_emptyhead_cluster):
cluster.remove_node(node3)
cluster.wait_for_nodes()
runner.step() # Error handling step
assert t3.status == Trial.ERROR
if t3.status != Trial.ERROR:
runner.step()
assert t3.status == Trial.ERROR, runner.debug_string()
with pytest.raises(TuneError):
runner.step()
@@ -340,9 +342,9 @@ def test_migration_checkpoint_removal(start_connected_emptyhead_cluster):
runner.step() # Recovery step
for i in range(3):
runner.step()
assert t1.status == Trial.TERMINATED
if t1.status != Trial.TERMINATED:
runner.step()
assert t1.status == Trial.TERMINATED, runner.debug_string()
def test_cluster_down_simple(start_connected_cluster, tmpdir):
@@ -10,67 +10,10 @@ import os
import pandas as pd
import ray
from ray.tune import run, Trainable, sample_from, Analysis, grid_search
from ray.tune import run, sample_from
from ray.tune.examples.async_hyperband_example import MyTrainableClass
class ExperimentAnalysisInMemorySuite(unittest.TestCase):
def setUp(self):
class MockTrainable(Trainable):
def _setup(self, config):
self.id = config["id"]
self.idx = 0
self.scores_dict = {
0: [5, 0],
1: [4, 1],
2: [2, 8],
3: [9, 6],
4: [7, 3]
}
def _train(self):
val = self.scores_dict[self.id][self.idx]
self.idx += 1
return {"score": val}
def _save(self, checkpoint_dir):
pass
def _restore(self, checkpoint_path):
pass
self.MockTrainable = MockTrainable
ray.init(local_mode=False, num_cpus=1)
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def testCompareTrials(self):
self.test_dir = tempfile.mkdtemp()
scores_all = [5, 4, 2, 9, 7, 0, 1, 8, 6, 3]
scores_last = scores_all[5:]
ea = run(
self.MockTrainable,
name="analysis_exp",
local_dir=self.test_dir,
stop={"training_iteration": 2},
num_samples=1,
config={"id": grid_search(list(range(5)))})
max_all = ea.get_best_trial("score",
"max").metric_analysis["score"]["max"]
min_all = ea.get_best_trial("score",
"min").metric_analysis["score"]["min"]
max_last = ea.get_best_trial("score", "max",
"last").metric_analysis["score"]["last"]
self.assertEqual(max_all, max(scores_all))
self.assertEqual(min_all, min(scores_all))
self.assertEqual(max_last, max(scores_last))
self.assertNotEqual(max_last, max(scores_all))
class ExperimentAnalysisSuite(unittest.TestCase):
def setUp(self):
ray.init(local_mode=False)
@@ -155,54 +98,6 @@ class ExperimentAnalysisSuite(unittest.TestCase):
self.assertEquals(df.shape[0], 1)
class AnalysisSuite(unittest.TestCase):
def setUp(self):
ray.init(local_mode=True)
self.test_dir = tempfile.mkdtemp()
self.num_samples = 10
self.metric = "episode_reward_mean"
self.run_test_exp(test_name="analysis_exp1")
self.run_test_exp(test_name="analysis_exp2")
def run_test_exp(self, test_name=None):
run(MyTrainableClass,
name=test_name,
local_dir=self.test_dir,
return_trials=False,
stop={"training_iteration": 1},
num_samples=self.num_samples,
config={
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(lambda spec: int(100 * random.random())),
})
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def testDataframe(self):
analysis = Analysis(self.test_dir)
df = analysis.dataframe()
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertEquals(df.shape[0], self.num_samples * 2)
def testBestLogdir(self):
analysis = Analysis(self.test_dir)
logdir = analysis.get_best_logdir(self.metric)
self.assertTrue(logdir.startswith(self.test_dir))
logdir2 = analysis.get_best_logdir(self.metric, mode="min")
self.assertTrue(logdir2.startswith(self.test_dir))
self.assertNotEquals(logdir, logdir2)
def testBestConfigIsLogdir(self):
analysis = Analysis(self.test_dir)
for metric, mode in [(self.metric, "min"), (self.metric, "max")]:
logdir = analysis.get_best_logdir(metric, mode=mode)
best_config = analysis.get_best_config(metric, mode=mode)
self.assertEquals(analysis.get_all_configs()[logdir], best_config)
if __name__ == "__main__":
import pytest
import sys
@@ -0,0 +1,124 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import unittest
import shutil
import tempfile
import random
import pandas as pd
import ray
from ray.tune import run, Trainable, sample_from, Analysis, grid_search
from ray.tune.examples.async_hyperband_example import MyTrainableClass
class ExperimentAnalysisInMemorySuite(unittest.TestCase):
def setUp(self):
class MockTrainable(Trainable):
def _setup(self, config):
self.id = config["id"]
self.idx = 0
self.scores_dict = {
0: [5, 0],
1: [4, 1],
2: [2, 8],
3: [9, 6],
4: [7, 3]
}
def _train(self):
val = self.scores_dict[self.id][self.idx]
self.idx += 1
return {"score": val}
def _save(self, checkpoint_dir):
pass
def _restore(self, checkpoint_path):
pass
self.MockTrainable = MockTrainable
ray.init(local_mode=False, num_cpus=1)
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def testCompareTrials(self):
self.test_dir = tempfile.mkdtemp()
scores_all = [5, 4, 2, 9, 7, 0, 1, 8, 6, 3]
scores_last = scores_all[5:]
ea = run(
self.MockTrainable,
name="analysis_exp",
local_dir=self.test_dir,
stop={"training_iteration": 2},
num_samples=1,
config={"id": grid_search(list(range(5)))})
max_all = ea.get_best_trial("score",
"max").metric_analysis["score"]["max"]
min_all = ea.get_best_trial("score",
"min").metric_analysis["score"]["min"]
max_last = ea.get_best_trial("score", "max",
"last").metric_analysis["score"]["last"]
self.assertEqual(max_all, max(scores_all))
self.assertEqual(min_all, min(scores_all))
self.assertEqual(max_last, max(scores_last))
self.assertNotEqual(max_last, max(scores_all))
class AnalysisSuite(unittest.TestCase):
def setUp(self):
ray.init(local_mode=True)
self.test_dir = tempfile.mkdtemp()
self.num_samples = 10
self.metric = "episode_reward_mean"
self.run_test_exp(test_name="analysis_exp1")
self.run_test_exp(test_name="analysis_exp2")
def run_test_exp(self, test_name=None):
run(MyTrainableClass,
name=test_name,
local_dir=self.test_dir,
return_trials=False,
stop={"training_iteration": 1},
num_samples=self.num_samples,
config={
"width": sample_from(
lambda spec: 10 + int(90 * random.random())),
"height": sample_from(lambda spec: int(100 * random.random())),
})
def tearDown(self):
shutil.rmtree(self.test_dir, ignore_errors=True)
ray.shutdown()
def testDataframe(self):
analysis = Analysis(self.test_dir)
df = analysis.dataframe()
self.assertTrue(isinstance(df, pd.DataFrame))
self.assertEquals(df.shape[0], self.num_samples * 2)
def testBestLogdir(self):
analysis = Analysis(self.test_dir)
logdir = analysis.get_best_logdir(self.metric)
self.assertTrue(logdir.startswith(self.test_dir))
logdir2 = analysis.get_best_logdir(self.metric, mode="min")
self.assertTrue(logdir2.startswith(self.test_dir))
self.assertNotEquals(logdir, logdir2)
def testBestConfigIsLogdir(self):
analysis = Analysis(self.test_dir)
for metric, mode in [(self.metric, "min"), (self.metric, "max")]:
logdir = analysis.get_best_logdir(metric, mode=mode)
best_config = analysis.get_best_config(metric, mode=mode)
self.assertEquals(analysis.get_all_configs()[logdir], best_config)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
+1 -820
View File
@@ -2,10 +2,7 @@ from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import sys
import tempfile
import unittest
import ray
@@ -15,39 +12,10 @@ from ray import tune
from ray.tune import TuneError, register_trainable
from ray.tune.ray_trial_executor import RayTrialExecutor
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.result import DONE
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.resources import Resources, json_to_resources, resources_to_json
from ray.tune.resources import Resources
from ray.tune.suggest import BasicVariantGenerator
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
SuggestionAlgorithm)
if sys.version_info >= (3, 3):
from unittest.mock import patch
else:
from mock import patch
def create_mock_components():
class _MockScheduler(FIFOScheduler):
errored_trials = []
def on_trial_error(self, trial_runner, trial):
self.errored_trials += [trial]
class _MockSearchAlg(BasicVariantGenerator):
errored_trials = []
def on_trial_complete(self, trial_id, error=False, **kwargs):
if error:
self.errored_trials += [trial_id]
searchalg = _MockSearchAlg()
scheduler = _MockScheduler()
return searchalg, scheduler
class TrialRunnerTest(unittest.TestCase):
@@ -317,794 +285,7 @@ class TrialRunnerTest(unittest.TestCase):
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(runner.trial_executor._committed_resources.cpu, 2)
def testErrorHandling(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=1),
}
_global_registry.register(TRAINABLE_CLASS, "asdf", None)
trials = [Trial("asdf", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[1].status, Trial.RUNNING)
def testThrowOnOverstep(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
runner.step()
self.assertRaises(TuneError, runner.step)
def testFailureRecoveryDisabled(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 0,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[0].num_failures, 1)
self.assertEqual(len(searchalg.errored_trials), 1)
self.assertEqual(len(scheduler.errored_trials), 1)
def testFailureRecoveryEnabled(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 1,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 1)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 0)
def testFailureRecoveryNodeRemoval(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 1,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
with patch("ray.cluster_resources") as resource_mock:
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
# Mimic a node failure
resource_mock.return_value = {"CPU": 0, "GPU": 0}
runner.step()
self.assertEqual(trials[0].status, Trial.PENDING)
self.assertEqual(trials[0].num_failures, 1)
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 1)
def testFailureRecoveryMaxFailures(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 2,
"config": {
"mock_error": True,
"persistent_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 1)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 2)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[0].num_failures, 3)
def testCheckpointing(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
path = runner.trial_executor.save(trials[0])
kwargs["restore_path"] = path
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[1].runner.get_info.remote()), 1)
self.addCleanup(os.remove, path)
def testRestoreMetricsAfterCheckpointing(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
path = runner.trial_executor.save(trials[0])
runner.trial_executor.stop_trial(trials[0])
kwargs["restore_path"] = path
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[1].last_result["timesteps_since_restore"], 10)
self.assertEqual(trials[1].last_result["iterations_since_restore"], 1)
self.assertGreater(trials[1].last_result["time_since_restore"], 0)
runner.step()
self.assertEqual(trials[1].last_result["timesteps_since_restore"], 20)
self.assertEqual(trials[1].last_result["iterations_since_restore"], 2)
self.assertGreater(trials[1].last_result["time_since_restore"], 0)
self.addCleanup(os.remove, path)
def testCheckpointingAtEnd(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"checkpoint_at_end": True,
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
runner.step()
self.assertEqual(trials[0].last_result[DONE], True)
self.assertEqual(trials[0].has_checkpoint(), True)
def testResultDone(self):
"""Tests that last_result is marked `done` after trial is complete."""
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertNotEqual(trials[0].last_result[DONE], True)
runner.step()
self.assertEqual(trials[0].last_result[DONE], True)
def testPauseThenResume(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.get_info.remote()), None)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
runner.trial_executor.pause_trial(trials[0])
self.assertEqual(trials[0].status, Trial.PAUSED)
runner.trial_executor.resume_trial(trials[0])
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.get_info.remote()), 1)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
def testStepHook(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
def on_step_begin(self, trialrunner):
self._update_avail_resources()
cnt = self.pre_step if hasattr(self, "pre_step") else 0
setattr(self, "pre_step", cnt + 1)
def on_step_end(self, trialrunner):
cnt = self.pre_step if hasattr(self, "post_step") else 0
setattr(self, "post_step", 1 + cnt)
import types
runner.trial_executor.on_step_begin = types.MethodType(
on_step_begin, runner.trial_executor)
runner.trial_executor.on_step_end = types.MethodType(
on_step_end, runner.trial_executor)
kwargs = {
"stopping_criterion": {
"training_iteration": 5
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
runner.step()
self.assertEqual(runner.trial_executor.pre_step, 1)
self.assertEqual(runner.trial_executor.post_step, 1)
def testStopTrial(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 5
},
"resources": Resources(cpu=1, gpu=1),
}
trials = [
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs)
]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)
# Stop trial while running
runner.stop_trial(trials[0])
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.PENDING)
# Stop trial while pending
runner.stop_trial(trials[-1])
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.TERMINATED)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[2].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.TERMINATED)
def testSearchAlgNotification(self):
"""Checks notification of trial to the Search Algorithm."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(searcher.counter["result"], 1)
self.assertEqual(searcher.counter["complete"], 1)
def testSearchAlgFinished(self):
"""Checks that SearchAlg is Finished before all trials are done."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 1}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertTrue(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgSchedulerInteraction(self):
"""Checks that TrialScheduler killing trial will notify SearchAlg."""
class _MockScheduler(FIFOScheduler):
def on_trial_result(self, *args, **kwargs):
return TrialScheduler.STOP
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertTrue(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgSchedulerEarlyStop(self):
"""Early termination notif to Searcher can be turned off."""
class _MockScheduler(FIFOScheduler):
def on_trial_result(self, *args, **kwargs):
return TrialScheduler.STOP
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=True)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
runner.step()
self.assertEqual(len(searcher.final_results), 1)
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=False)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
runner.step()
self.assertEqual(len(searcher.final_results), 0)
def testSearchAlgStalled(self):
"""Checks that runner and searcher state is maintained when stalled."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {
"run": "__fake",
"num_samples": 3,
"stop": {
"training_iteration": 1
}
}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=1)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(len(searcher.live_trials), 1)
searcher.stall = True
runner.step()
self.assertEqual(trials[1].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(all(trial.is_finished() for trial in trials))
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
searcher.stall = False
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[2].status, Trial.RUNNING)
self.assertEqual(len(searcher.live_trials), 1)
runner.step()
self.assertEqual(trials[2].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgFinishes(self):
"""Empty SearchAlg changing state in `next_trials` does not crash."""
class FinishFastAlg(SuggestionAlgorithm):
_index = 0
def next_trials(self):
trials = []
self._index += 1
for trial in self._trial_generator:
trials += [trial]
break
if self._index > 4:
self._finished = True
return trials
def _suggest(self, trial_id):
return {}
ray.init(num_cpus=2)
experiment_spec = {
"run": "__fake",
"num_samples": 2,
"stop": {
"training_iteration": 1
}
}
searcher = FinishFastAlg()
experiments = [Experiment.from_json("test", experiment_spec)]
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
self.assertFalse(runner.is_finished())
runner.step() # This launches a new run
runner.step() # This launches a 2nd run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # This kills the first run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # This kills the 2nd run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # this converts self._finished to True
self.assertTrue(searcher.is_finished())
self.assertRaises(TuneError, runner.step)
def testTrialSaveRestore(self):
"""Creates different trials to test runner.checkpoint/restore."""
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
trials = [
Trial(
"__fake",
trial_id="trial_terminate",
stopping_criterion={"training_iteration": 1},
checkpoint_freq=1)
]
runner.add_trial(trials[0])
runner.step() # start
runner.step()
self.assertEquals(trials[0].status, Trial.TERMINATED)
trials += [
Trial(
"__fake",
trial_id="trial_fail",
stopping_criterion={"training_iteration": 3},
checkpoint_freq=1,
config={"mock_error": True})
]
runner.add_trial(trials[1])
runner.step()
runner.step()
runner.step()
self.assertEquals(trials[1].status, Trial.ERROR)
trials += [
Trial(
"__fake",
trial_id="trial_succ",
stopping_criterion={"training_iteration": 2},
checkpoint_freq=1)
]
runner.add_trial(trials[2])
runner.step()
self.assertEquals(len(runner.trial_executor.get_checkpoints()), 3)
self.assertEquals(trials[2].status, Trial.RUNNING)
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
for tid in ["trial_terminate", "trial_fail"]:
original_trial = runner.get_trial(tid)
restored_trial = runner2.get_trial(tid)
self.assertEqual(original_trial.status, restored_trial.status)
restored_trial = runner2.get_trial("trial_succ")
self.assertEqual(Trial.PENDING, restored_trial.status)
runner2.step()
runner2.step()
runner2.step()
self.assertRaises(TuneError, runner2.step)
shutil.rmtree(tmpdir)
def testTrialNoSave(self):
"""Check that non-checkpointing trials are not saved."""
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(
Trial(
"__fake",
trial_id="non_checkpoint",
stopping_criterion={"training_iteration": 2}))
while not all(t.status == Trial.TERMINATED
for t in runner.get_trials()):
runner.step()
runner.add_trial(
Trial(
"__fake",
trial_id="checkpoint",
checkpoint_at_end=True,
stopping_criterion={"training_iteration": 2}))
while not all(t.status == Trial.TERMINATED
for t in runner.get_trials()):
runner.step()
runner.add_trial(
Trial(
"__fake",
trial_id="pending",
stopping_criterion={"training_iteration": 2}))
runner.step()
runner.step()
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
new_trials = runner2.get_trials()
self.assertEquals(len(new_trials), 3)
self.assertTrue(
runner2.get_trial("non_checkpoint").status == Trial.TERMINATED)
self.assertTrue(
runner2.get_trial("checkpoint").status == Trial.TERMINATED)
self.assertTrue(runner2.get_trial("pending").status == Trial.PENDING)
self.assertTrue(not runner2.get_trial("pending").last_result)
runner2.step()
shutil.rmtree(tmpdir)
def testCheckpointWithFunction(self):
ray.init()
trial = Trial(
"__fake",
config={"callbacks": {
"on_episode_start": lambda i: i,
}},
checkpoint_freq=1)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(trial)
for i in range(5):
runner.step()
# force checkpoint
runner.checkpoint()
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
new_trial = runner2.get_trials()[0]
self.assertTrue("callbacks" in new_trial.config)
self.assertTrue("on_episode_start" in new_trial.config["callbacks"])
shutil.rmtree(tmpdir)
def testCheckpointOverwrite(self):
def count_checkpoints(cdir):
return sum((fname.startswith("experiment_state")
and fname.endswith(".json"))
for fname in os.listdir(cdir))
ray.init()
trial = Trial("__fake", checkpoint_freq=1)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(trial)
for i in range(5):
runner.step()
# force checkpoint
runner.checkpoint()
self.assertEquals(count_checkpoints(tmpdir), 1)
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
for i in range(5):
runner2.step()
self.assertEquals(count_checkpoints(tmpdir), 2)
runner2.checkpoint()
self.assertEquals(count_checkpoints(tmpdir), 2)
shutil.rmtree(tmpdir)
def testUserCheckpoint(self):
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(Trial("__fake", config={"user_checkpoint_freq": 2}))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
runner.step() # 0
self.assertFalse(trials[0].has_checkpoint())
runner.step() # 1
self.assertFalse(trials[0].has_checkpoint())
runner.step() # 2
self.assertTrue(trials[0].has_checkpoint())
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
runner2.step()
trials2 = runner2.get_trials()
self.assertEqual(ray.get(trials2[0].runner.get_info.remote()), 1)
shutil.rmtree(tmpdir)
class SearchAlgorithmTest(unittest.TestCase):
def testNestedSuggestion(self):
class TestSuggestion(SuggestionAlgorithm):
def _suggest(self, trial_id):
return {"a": {"b": {"c": {"d": 4, "e": 5}}}}
alg = TestSuggestion()
alg.add_configurations({"test": {"run": "__fake"}})
trial = alg.next_trials()[0]
self.assertTrue("e=5" in trial.experiment_tag)
self.assertTrue("d=4" in trial.experiment_tag)
class ResourcesTest(unittest.TestCase):
def testSubtraction(self):
resource_1 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
resource_2 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
new_res = Resources.subtract(resource_1, resource_2)
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
self.assertTrue(
all(k == 0 for k in new_res.extra_custom_resources.values()))
def testDifferentResources(self):
resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
new_res = Resources.subtract(resource_1, resource_2)
assert "c" in new_res.custom_resources
assert "b" in new_res.custom_resources
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(new_res.get("a") == 0)
def testSerialization(self):
original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
jsoned = resources_to_json(original)
new_resource = json_to_resources(jsoned)
self.assertEquals(original, new_resource)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,334 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import sys
import unittest
import ray
from ray.rllib import _register_all
from ray.tune import TuneError
from ray.tune.schedulers import FIFOScheduler
from ray.tune.result import DONE
from ray.tune.registry import _global_registry, TRAINABLE_CLASS
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.resources import Resources
from ray.tune.suggest import BasicVariantGenerator
if sys.version_info >= (3, 3):
from unittest.mock import patch
else:
from mock import patch
def create_mock_components():
class _MockScheduler(FIFOScheduler):
errored_trials = []
def on_trial_error(self, trial_runner, trial):
self.errored_trials += [trial]
class _MockSearchAlg(BasicVariantGenerator):
errored_trials = []
def on_trial_complete(self, trial_id, error=False, **kwargs):
if error:
self.errored_trials += [trial_id]
searchalg = _MockSearchAlg()
scheduler = _MockScheduler()
return searchalg, scheduler
class TrialRunnerTest2(unittest.TestCase):
def tearDown(self):
ray.shutdown()
_register_all() # re-register the evicted objects
def testErrorHandling(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=1),
}
_global_registry.register(TRAINABLE_CLASS, "asdf", None)
trials = [Trial("asdf", **kwargs), Trial("__fake", **kwargs)]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[1].status, Trial.RUNNING)
def testThrowOnOverstep(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
runner.step()
self.assertRaises(TuneError, runner.step)
def testFailureRecoveryDisabled(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 0,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[0].num_failures, 1)
self.assertEqual(len(searchalg.errored_trials), 1)
self.assertEqual(len(scheduler.errored_trials), 1)
def testFailureRecoveryEnabled(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 1,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 1)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 0)
def testFailureRecoveryNodeRemoval(self):
ray.init(num_cpus=1, num_gpus=1)
searchalg, scheduler = create_mock_components()
runner = TrialRunner(searchalg, scheduler=scheduler)
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 1,
"config": {
"mock_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
with patch("ray.cluster_resources") as resource_mock:
resource_mock.return_value = {"CPU": 1, "GPU": 1}
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
# Mimic a node failure
resource_mock.return_value = {"CPU": 0, "GPU": 0}
runner.step()
self.assertEqual(trials[0].status, Trial.PENDING)
self.assertEqual(trials[0].num_failures, 1)
self.assertEqual(len(searchalg.errored_trials), 0)
self.assertEqual(len(scheduler.errored_trials), 1)
def testFailureRecoveryMaxFailures(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"resources": Resources(cpu=1, gpu=1),
"checkpoint_freq": 1,
"max_failures": 2,
"config": {
"mock_error": True,
"persistent_error": True,
},
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 1)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[0].num_failures, 2)
runner.step()
self.assertEqual(trials[0].status, Trial.ERROR)
self.assertEqual(trials[0].num_failures, 3)
def testCheckpointing(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 1
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
path = runner.trial_executor.save(trials[0])
kwargs["restore_path"] = path
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[1].runner.get_info.remote()), 1)
self.addCleanup(os.remove, path)
def testRestoreMetricsAfterCheckpointing(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
path = runner.trial_executor.save(trials[0])
runner.trial_executor.stop_trial(trials[0])
kwargs["restore_path"] = path
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[1].last_result["timesteps_since_restore"], 10)
self.assertEqual(trials[1].last_result["iterations_since_restore"], 1)
self.assertGreater(trials[1].last_result["time_since_restore"], 0)
runner.step()
self.assertEqual(trials[1].last_result["timesteps_since_restore"], 20)
self.assertEqual(trials[1].last_result["iterations_since_restore"], 2)
self.assertGreater(trials[1].last_result["time_since_restore"], 0)
self.addCleanup(os.remove, path)
def testCheckpointingAtEnd(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"checkpoint_at_end": True,
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
runner.step()
self.assertEqual(trials[0].last_result[DONE], True)
self.assertEqual(trials[0].has_checkpoint(), True)
def testResultDone(self):
"""Tests that last_result is marked `done` after trial is complete."""
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertNotEqual(trials[0].last_result[DONE], True)
runner.step()
self.assertEqual(trials[0].last_result[DONE], True)
def testPauseThenResume(self):
ray.init(num_cpus=1, num_gpus=1)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 2
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.get_info.remote()), None)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
runner.trial_executor.pause_trial(trials[0])
self.assertEqual(trials[0].status, Trial.PAUSED)
runner.trial_executor.resume_trial(trials[0])
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.get_info.remote()), 1)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
if __name__ == "__main__":
import pytest
import sys
sys.exit(pytest.main(["-v", __file__]))
@@ -0,0 +1,539 @@
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import os
import shutil
import sys
import tempfile
import unittest
import ray
from ray.rllib import _register_all
from ray.tune import TuneError
from ray.tune.schedulers import TrialScheduler, FIFOScheduler
from ray.tune.experiment import Experiment
from ray.tune.trial import Trial
from ray.tune.trial_runner import TrialRunner
from ray.tune.resources import Resources, json_to_resources, resources_to_json
from ray.tune.suggest.suggestion import (_MockSuggestionAlgorithm,
SuggestionAlgorithm)
class TrialRunnerTest3(unittest.TestCase):
def tearDown(self):
ray.shutdown()
_register_all() # re-register the evicted objects
def testStepHook(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
def on_step_begin(self, trialrunner):
self._update_avail_resources()
cnt = self.pre_step if hasattr(self, "pre_step") else 0
setattr(self, "pre_step", cnt + 1)
def on_step_end(self, trialrunner):
cnt = self.pre_step if hasattr(self, "post_step") else 0
setattr(self, "post_step", 1 + cnt)
import types
runner.trial_executor.on_step_begin = types.MethodType(
on_step_begin, runner.trial_executor)
runner.trial_executor.on_step_end = types.MethodType(
on_step_end, runner.trial_executor)
kwargs = {
"stopping_criterion": {
"training_iteration": 5
},
"resources": Resources(cpu=1, gpu=1),
}
runner.add_trial(Trial("__fake", **kwargs))
runner.step()
self.assertEqual(runner.trial_executor.pre_step, 1)
self.assertEqual(runner.trial_executor.post_step, 1)
def testStopTrial(self):
ray.init(num_cpus=4, num_gpus=2)
runner = TrialRunner()
kwargs = {
"stopping_criterion": {
"training_iteration": 5
},
"resources": Resources(cpu=1, gpu=1),
}
trials = [
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs),
Trial("__fake", **kwargs)
]
for t in trials:
runner.add_trial(t)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(trials[1].status, Trial.PENDING)
# Stop trial while running
runner.stop_trial(trials[0])
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.PENDING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.PENDING)
# Stop trial while pending
runner.stop_trial(trials[-1])
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.TERMINATED)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(trials[2].status, Trial.RUNNING)
self.assertEqual(trials[-1].status, Trial.TERMINATED)
def testSearchAlgNotification(self):
"""Checks notification of trial to the Search Algorithm."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(searcher.counter["result"], 1)
self.assertEqual(searcher.counter["complete"], 1)
def testSearchAlgFinished(self):
"""Checks that SearchAlg is Finished before all trials are done."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 1}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertTrue(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgSchedulerInteraction(self):
"""Checks that TrialScheduler killing trial will notify SearchAlg."""
class _MockScheduler(FIFOScheduler):
def on_trial_result(self, *args, **kwargs):
return TrialScheduler.STOP
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=10)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertTrue(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgSchedulerEarlyStop(self):
"""Early termination notif to Searcher can be turned off."""
class _MockScheduler(FIFOScheduler):
def on_trial_result(self, *args, **kwargs):
return TrialScheduler.STOP
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {"run": "__fake", "stop": {"training_iteration": 2}}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=True)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
runner.step()
self.assertEqual(len(searcher.final_results), 1)
searcher = _MockSuggestionAlgorithm(use_early_stopped_trials=False)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher, scheduler=_MockScheduler())
runner.step()
runner.step()
self.assertEqual(len(searcher.final_results), 0)
def testSearchAlgStalled(self):
"""Checks that runner and searcher state is maintained when stalled."""
ray.init(num_cpus=4, num_gpus=2)
experiment_spec = {
"run": "__fake",
"num_samples": 3,
"stop": {
"training_iteration": 1
}
}
experiments = [Experiment.from_json("test", experiment_spec)]
searcher = _MockSuggestionAlgorithm(max_concurrent=1)
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[0].status, Trial.RUNNING)
runner.step()
self.assertEqual(trials[0].status, Trial.TERMINATED)
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[1].status, Trial.RUNNING)
self.assertEqual(len(searcher.live_trials), 1)
searcher.stall = True
runner.step()
self.assertEqual(trials[1].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(all(trial.is_finished() for trial in trials))
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
searcher.stall = False
runner.step()
trials = runner.get_trials()
self.assertEqual(trials[2].status, Trial.RUNNING)
self.assertEqual(len(searcher.live_trials), 1)
runner.step()
self.assertEqual(trials[2].status, Trial.TERMINATED)
self.assertEqual(len(searcher.live_trials), 0)
self.assertTrue(searcher.is_finished())
self.assertTrue(runner.is_finished())
def testSearchAlgFinishes(self):
"""Empty SearchAlg changing state in `next_trials` does not crash."""
class FinishFastAlg(SuggestionAlgorithm):
_index = 0
def next_trials(self):
trials = []
self._index += 1
for trial in self._trial_generator:
trials += [trial]
break
if self._index > 4:
self._finished = True
return trials
def _suggest(self, trial_id):
return {}
ray.init(num_cpus=2)
experiment_spec = {
"run": "__fake",
"num_samples": 2,
"stop": {
"training_iteration": 1
}
}
searcher = FinishFastAlg()
experiments = [Experiment.from_json("test", experiment_spec)]
searcher.add_configurations(experiments)
runner = TrialRunner(search_alg=searcher)
self.assertFalse(runner.is_finished())
runner.step() # This launches a new run
runner.step() # This launches a 2nd run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # This kills the first run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # This kills the 2nd run
self.assertFalse(searcher.is_finished())
self.assertFalse(runner.is_finished())
runner.step() # this converts self._finished to True
self.assertTrue(searcher.is_finished())
self.assertRaises(TuneError, runner.step)
def testTrialSaveRestore(self):
"""Creates different trials to test runner.checkpoint/restore."""
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
trials = [
Trial(
"__fake",
trial_id="trial_terminate",
stopping_criterion={"training_iteration": 1},
checkpoint_freq=1)
]
runner.add_trial(trials[0])
runner.step() # start
runner.step()
self.assertEquals(trials[0].status, Trial.TERMINATED)
trials += [
Trial(
"__fake",
trial_id="trial_fail",
stopping_criterion={"training_iteration": 3},
checkpoint_freq=1,
config={"mock_error": True})
]
runner.add_trial(trials[1])
runner.step()
runner.step()
runner.step()
self.assertEquals(trials[1].status, Trial.ERROR)
trials += [
Trial(
"__fake",
trial_id="trial_succ",
stopping_criterion={"training_iteration": 2},
checkpoint_freq=1)
]
runner.add_trial(trials[2])
runner.step()
self.assertEquals(len(runner.trial_executor.get_checkpoints()), 3)
self.assertEquals(trials[2].status, Trial.RUNNING)
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
for tid in ["trial_terminate", "trial_fail"]:
original_trial = runner.get_trial(tid)
restored_trial = runner2.get_trial(tid)
self.assertEqual(original_trial.status, restored_trial.status)
restored_trial = runner2.get_trial("trial_succ")
self.assertEqual(Trial.PENDING, restored_trial.status)
runner2.step()
runner2.step()
runner2.step()
self.assertRaises(TuneError, runner2.step)
shutil.rmtree(tmpdir)
def testTrialNoSave(self):
"""Check that non-checkpointing trials are not saved."""
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(
Trial(
"__fake",
trial_id="non_checkpoint",
stopping_criterion={"training_iteration": 2}))
while not all(t.status == Trial.TERMINATED
for t in runner.get_trials()):
runner.step()
runner.add_trial(
Trial(
"__fake",
trial_id="checkpoint",
checkpoint_at_end=True,
stopping_criterion={"training_iteration": 2}))
while not all(t.status == Trial.TERMINATED
for t in runner.get_trials()):
runner.step()
runner.add_trial(
Trial(
"__fake",
trial_id="pending",
stopping_criterion={"training_iteration": 2}))
runner.step()
runner.step()
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
new_trials = runner2.get_trials()
self.assertEquals(len(new_trials), 3)
self.assertTrue(
runner2.get_trial("non_checkpoint").status == Trial.TERMINATED)
self.assertTrue(
runner2.get_trial("checkpoint").status == Trial.TERMINATED)
self.assertTrue(runner2.get_trial("pending").status == Trial.PENDING)
self.assertTrue(not runner2.get_trial("pending").last_result)
runner2.step()
shutil.rmtree(tmpdir)
def testCheckpointWithFunction(self):
ray.init()
trial = Trial(
"__fake",
config={"callbacks": {
"on_episode_start": lambda i: i,
}},
checkpoint_freq=1)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(trial)
for i in range(5):
runner.step()
# force checkpoint
runner.checkpoint()
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
new_trial = runner2.get_trials()[0]
self.assertTrue("callbacks" in new_trial.config)
self.assertTrue("on_episode_start" in new_trial.config["callbacks"])
shutil.rmtree(tmpdir)
def testCheckpointOverwrite(self):
def count_checkpoints(cdir):
return sum((fname.startswith("experiment_state")
and fname.endswith(".json"))
for fname in os.listdir(cdir))
ray.init()
trial = Trial("__fake", checkpoint_freq=1)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(trial)
for i in range(5):
runner.step()
# force checkpoint
runner.checkpoint()
self.assertEquals(count_checkpoints(tmpdir), 1)
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
for i in range(5):
runner2.step()
self.assertEquals(count_checkpoints(tmpdir), 2)
runner2.checkpoint()
self.assertEquals(count_checkpoints(tmpdir), 2)
shutil.rmtree(tmpdir)
def testUserCheckpoint(self):
ray.init(num_cpus=3)
tmpdir = tempfile.mkdtemp()
runner = TrialRunner(local_checkpoint_dir=tmpdir, checkpoint_period=0)
runner.add_trial(Trial("__fake", config={"user_checkpoint_freq": 2}))
trials = runner.get_trials()
runner.step()
self.assertEqual(trials[0].status, Trial.RUNNING)
self.assertEqual(ray.get(trials[0].runner.set_info.remote(1)), 1)
runner.step() # 0
self.assertFalse(trials[0].has_checkpoint())
runner.step() # 1
self.assertFalse(trials[0].has_checkpoint())
runner.step() # 2
self.assertTrue(trials[0].has_checkpoint())
runner2 = TrialRunner(resume="LOCAL", local_checkpoint_dir=tmpdir)
runner2.step()
trials2 = runner2.get_trials()
self.assertEqual(ray.get(trials2[0].runner.get_info.remote()), 1)
shutil.rmtree(tmpdir)
class SearchAlgorithmTest(unittest.TestCase):
def testNestedSuggestion(self):
class TestSuggestion(SuggestionAlgorithm):
def _suggest(self, trial_id):
return {"a": {"b": {"c": {"d": 4, "e": 5}}}}
alg = TestSuggestion()
alg.add_configurations({"test": {"run": "__fake"}})
trial = alg.next_trials()[0]
self.assertTrue("e=5" in trial.experiment_tag)
self.assertTrue("d=4" in trial.experiment_tag)
class ResourcesTest(unittest.TestCase):
def testSubtraction(self):
resource_1 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
resource_2 = Resources(
1,
0,
0,
1,
custom_resources={
"a": 1,
"b": 2
},
extra_custom_resources={
"a": 1,
"b": 1
})
new_res = Resources.subtract(resource_1, resource_2)
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(all(k == 0 for k in new_res.custom_resources.values()))
self.assertTrue(
all(k == 0 for k in new_res.extra_custom_resources.values()))
def testDifferentResources(self):
resource_1 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
resource_2 = Resources(1, 0, 0, 1, custom_resources={"a": 1, "c": 2})
new_res = Resources.subtract(resource_1, resource_2)
assert "c" in new_res.custom_resources
assert "b" in new_res.custom_resources
self.assertTrue(new_res.cpu == 0)
self.assertTrue(new_res.gpu == 0)
self.assertTrue(new_res.extra_cpu == 0)
self.assertTrue(new_res.extra_gpu == 0)
self.assertTrue(new_res.get("a") == 0)
def testSerialization(self):
original = Resources(1, 0, 0, 1, custom_resources={"a": 1, "b": 2})
jsoned = resources_to_json(original)
new_resource = json_to_resources(jsoned)
self.assertEquals(original, new_resource)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))