Files
ray/python/ray/tests/test_advanced_2.py
T

754 lines
22 KiB
Python

# coding: utf-8
import logging
import os
import sys
import time
import numpy as np
import pytest
import ray
import ray.cluster_utils
import ray.test_utils
from ray.test_utils import (
RayTestTimeoutException,
wait_for_condition,
)
logger = logging.getLogger(__name__)
def test_resource_constraints(shutdown_only):
num_workers = 20
ray.init(num_cpus=10, num_gpus=2)
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(
ray.get([
get_worker_id.remote() for _ in range(num_workers)
]))) == num_workers:
break
time_buffer = 2
# At most 10 copies of this can run at once.
@ray.remote(num_cpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(10)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(11)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
@ray.remote(num_cpus=3)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
@ray.remote(num_gpus=1)
def f(n):
time.sleep(n)
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(2)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(3)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([f.remote(0.5) for _ in range(4)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
def test_multi_resource_constraints(shutdown_only):
num_workers = 20
ray.init(num_cpus=10, num_gpus=10)
@ray.remote(num_cpus=0)
def get_worker_id():
time.sleep(0.1)
return os.getpid()
# Attempt to wait for all of the workers to start up.
while True:
if len(
set(
ray.get([
get_worker_id.remote() for _ in range(num_workers)
]))) == num_workers:
break
@ray.remote(num_cpus=1, num_gpus=9)
def f(n):
time.sleep(n)
@ray.remote(num_cpus=9, num_gpus=1)
def g(n):
time.sleep(n)
time_buffer = 2
start_time = time.time()
ray.get([f.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 0.5 + time_buffer
assert duration > 0.5
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
start_time = time.time()
ray.get([f.remote(0.5), f.remote(0.5), g.remote(0.5), g.remote(0.5)])
duration = time.time() - start_time
assert duration < 1 + time_buffer
assert duration > 1
def test_gpu_ids(shutdown_only):
num_gpus = 3
ray.init(num_cpus=num_gpus, num_gpus=num_gpus)
def get_gpu_ids(num_gpus_per_worker):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == num_gpus_per_worker
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
for gpu_id in gpu_ids:
assert gpu_id in range(num_gpus)
return gpu_ids
f0 = ray.remote(num_gpus=0)(lambda: get_gpu_ids(0))
f1 = ray.remote(num_gpus=1)(lambda: get_gpu_ids(1))
f2 = ray.remote(num_gpus=2)(lambda: get_gpu_ids(2))
# Wait for all workers to start up.
@ray.remote
def f():
time.sleep(0.2)
return os.getpid()
start_time = time.time()
while True:
num_workers_started = len(
set(ray.get([f.remote() for _ in range(num_gpus)])))
if num_workers_started == num_gpus:
break
if time.time() > start_time + 10:
raise RayTestTimeoutException(
"Timed out while waiting for workers to start "
"up.")
list_of_ids = ray.get([f0.remote() for _ in range(10)])
assert list_of_ids == 10 * [[]]
ray.get([f1.remote() for _ in range(10)])
ray.get([f2.remote() for _ in range(10)])
# Test that actors have CUDA_VISIBLE_DEVICES set properly.
@ray.remote
class Actor0:
def __init__(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 0
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
# Set self.x to make sure that we got here.
self.x = 1
def test(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 0
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
return self.x
@ray.remote(num_gpus=1)
class Actor1:
def __init__(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
# Set self.x to make sure that we got here.
self.x = 1
def test(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert (os.environ["CUDA_VISIBLE_DEVICES"] == ",".join(
[str(i) for i in gpu_ids]))
return self.x
a0 = Actor0.remote()
ray.get(a0.test.remote())
a1 = Actor1.remote()
ray.get(a1.test.remote())
def test_zero_cpus(shutdown_only):
ray.init(num_cpus=0)
# We should be able to execute a task that requires 0 CPU resources.
@ray.remote(num_cpus=0)
def f():
return 1
ray.get(f.remote())
# We should be able to create an actor that requires 0 CPU resources.
@ray.remote(num_cpus=0)
class Actor:
def method(self):
pass
a = Actor.remote()
x = a.method.remote()
ray.get(x)
def test_zero_cpus_actor(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=0)
valid_node = cluster.add_node(num_cpus=2)
ray.init(address=cluster.address)
@ray.remote
class Foo:
def method(self):
return ray.worker.global_worker.node.unique_id
# Make sure tasks and actors run on the remote raylet.
a = Foo.remote()
assert valid_node.unique_id == ray.get(a.method.remote())
def test_fractional_resources(shutdown_only):
ray.init(num_cpus=6, num_gpus=3, resources={"Custom": 1})
@ray.remote(num_gpus=0.5)
class Foo1:
def method(self):
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
return gpu_ids[0]
foos = [Foo1.remote() for _ in range(6)]
gpu_ids = ray.get([f.method.remote() for f in foos])
for i in range(3):
assert gpu_ids.count(i) == 2
del foos
@ray.remote
class Foo2:
def method(self):
pass
# Create an actor that requires 0.7 of the custom resource.
f1 = Foo2._remote([], {}, resources={"Custom": 0.7})
ray.get(f1.method.remote())
# Make sure that we cannot create an actor that requires 0.7 of the
# custom resource. TODO(rkn): Re-enable this once ray.wait is
# implemented.
f2 = Foo2._remote([], {}, resources={"Custom": 0.7})
ready, _ = ray.wait([f2.method.remote()], timeout=0.5)
assert len(ready) == 0
# Make sure we can start an actor that requries only 0.3 of the custom
# resource.
f3 = Foo2._remote([], {}, resources={"Custom": 0.3})
ray.get(f3.method.remote())
del f1, f3
# Make sure that we get exceptions if we submit tasks that require a
# fractional number of resources greater than 1.
@ray.remote(num_cpus=1.5)
def test():
pass
with pytest.raises(ValueError):
test.remote()
with pytest.raises(ValueError):
Foo2._remote([], {}, resources={"Custom": 1.5})
def test_multiple_raylets(ray_start_cluster):
# This test will define a bunch of tasks that can only be assigned to
# specific raylets, and we will check that they are assigned
# to the correct raylets.
cluster = ray_start_cluster
cluster.add_node(num_cpus=11, num_gpus=0)
cluster.add_node(num_cpus=5, num_gpus=5)
cluster.add_node(num_cpus=10, num_gpus=1)
ray.init(address=cluster.address)
cluster.wait_for_nodes()
# Define a bunch of remote functions that all return the socket name of
# the plasma store. Since there is a one-to-one correspondence between
# plasma stores and raylets (at least right now), this can be
# used to identify which raylet the task was assigned to.
# This must be run on the zeroth raylet.
@ray.remote(num_cpus=11)
def run_on_0():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first raylet.
@ray.remote(num_gpus=2)
def run_on_1():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the second raylet.
@ray.remote(num_cpus=6, num_gpus=1)
def run_on_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This can be run anywhere.
@ray.remote(num_cpus=0, num_gpus=0)
def run_on_0_1_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the first or second raylet.
@ray.remote(num_gpus=1)
def run_on_1_2():
return ray.worker.global_worker.node.plasma_store_socket_name
# This must be run on the zeroth or second raylet.
@ray.remote(num_cpus=8)
def run_on_0_2():
return ray.worker.global_worker.node.plasma_store_socket_name
def run_lots_of_tasks():
names = []
results = []
for i in range(100):
index = np.random.randint(6)
if index == 0:
names.append("run_on_0")
results.append(run_on_0.remote())
elif index == 1:
names.append("run_on_1")
results.append(run_on_1.remote())
elif index == 2:
names.append("run_on_2")
results.append(run_on_2.remote())
elif index == 3:
names.append("run_on_0_1_2")
results.append(run_on_0_1_2.remote())
elif index == 4:
names.append("run_on_1_2")
results.append(run_on_1_2.remote())
elif index == 5:
names.append("run_on_0_2")
results.append(run_on_0_2.remote())
return names, results
client_table = ray.nodes()
store_names = []
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 0
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 5
]
store_names += [
client["ObjectStoreSocketName"] for client in client_table
if client["Resources"].get("GPU", 0) == 1
]
assert len(store_names) == 3
def validate_names_and_results(names, results):
for name, result in zip(names, ray.get(results)):
if name == "run_on_0":
assert result in [store_names[0]]
elif name == "run_on_1":
assert result in [store_names[1]]
elif name == "run_on_2":
assert result in [store_names[2]]
elif name == "run_on_0_1_2":
assert (result in [
store_names[0], store_names[1], store_names[2]
])
elif name == "run_on_1_2":
assert result in [store_names[1], store_names[2]]
elif name == "run_on_0_2":
assert result in [store_names[0], store_names[2]]
else:
raise Exception("This should be unreachable.")
assert set(ray.get(results)) == set(store_names)
names, results = run_lots_of_tasks()
validate_names_and_results(names, results)
# Make sure the same thing works when this is nested inside of a task.
@ray.remote
def run_nested1():
names, results = run_lots_of_tasks()
return names, results
@ray.remote
def run_nested2():
names, results = ray.get(run_nested1.remote())
return names, results
names, results = ray.get(run_nested2.remote())
validate_names_and_results(names, results)
def test_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=1, resources={"CustomResource": 0})
custom_resource_node = cluster.add_node(
num_cpus=1, resources={"CustomResource": 1})
ray.init(address=cluster.address)
@ray.remote
def f():
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def g():
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource": 1})
def h():
ray.get([f.remote() for _ in range(5)])
return ray.worker.global_worker.node.unique_id
# The g tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([g.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] == custom_resource_node.unique_id
# Make sure that resource bookkeeping works when a task that uses a
# custom resources gets blocked.
ray.get([h.remote() for _ in range(5)])
def test_node_id_resource(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(num_cpus=3)
cluster.add_node(num_cpus=3)
ray.init(address=cluster.address)
local_node = ray.state.current_node_id()
# Note that these will have the same IP in the test cluster
assert len(ray.state.node_ids()) == 2
assert local_node in ray.state.node_ids()
@ray.remote(resources={local_node: 1})
def f():
return ray.state.current_node_id()
# Check the node id resource is automatically usable for scheduling.
assert ray.get(f.remote()) == ray.state.current_node_id()
def test_two_custom_resources(ray_start_cluster):
cluster = ray_start_cluster
cluster.add_node(
num_cpus=3, resources={
"CustomResource1": 1,
"CustomResource2": 2
})
custom_resource_node = cluster.add_node(
num_cpus=3, resources={
"CustomResource1": 3,
"CustomResource2": 4
})
ray.init(address=cluster.address)
@ray.remote
def foo():
# Sleep a while to emulate a slow operation. This is needed to make
# sure tasks are scheduled to different nodes.
time.sleep(0.1)
return ray.worker.global_worker.node.unique_id
# Make sure each node has at least one idle worker.
wait_for_condition(
lambda: len(set(ray.get([foo.remote() for _ in range(6)]))) == 2)
@ray.remote(resources={"CustomResource1": 1})
def f():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource2": 1})
def g():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 1, "CustomResource2": 3})
def h():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource1": 4})
def j():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
@ray.remote(resources={"CustomResource3": 1})
def k():
time.sleep(0.001)
return ray.worker.global_worker.node.unique_id
# The f and g tasks should be scheduled on both raylets.
assert len(set(ray.get([f.remote() for _ in range(500)]))) == 2
assert len(set(ray.get([g.remote() for _ in range(500)]))) == 2
# The h tasks should be scheduled only on the second raylet.
raylet_ids = set(ray.get([h.remote() for _ in range(50)]))
assert len(raylet_ids) == 1
assert list(raylet_ids)[0] == custom_resource_node.unique_id
# Make sure that tasks with unsatisfied custom resource requirements do
# not get scheduled.
ready_ids, remaining_ids = ray.wait([j.remote(), k.remote()], timeout=0.5)
assert ready_ids == []
def test_many_custom_resources(shutdown_only):
num_custom_resources = 10000
total_resources = {
str(i): np.random.randint(1, 7)
for i in range(num_custom_resources)
}
ray.init(num_cpus=5, resources=total_resources)
def f():
return 1
remote_functions = []
for _ in range(20):
num_resources = np.random.randint(0, num_custom_resources + 1)
permuted_resources = np.random.permutation(
num_custom_resources)[:num_resources]
random_resources = {
str(i): total_resources[str(i)]
for i in permuted_resources
}
remote_function = ray.remote(resources=random_resources)(f)
remote_functions.append(remote_function)
remote_functions.append(ray.remote(f))
remote_functions.append(ray.remote(resources=total_resources)(f))
results = []
for remote_function in remote_functions:
results.append(remote_function.remote())
results.append(remote_function.remote())
results.append(remote_function.remote())
ray.get(results)
# TODO: 5 retry attempts may be too little for Travis and we may need to
# increase it if this test begins to be flaky on Travis.
def test_zero_capacity_deletion_semantics(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"test_resource": 1})
def delete_miscellaneous_item(resources):
del resources["memory"]
del resources["object_store_memory"]
for key in list(resources.keys()):
if key.startswith("node:"):
del resources[key]
def test():
resources = ray.available_resources()
MAX_RETRY_ATTEMPTS = 5
retry_count = 0
delete_miscellaneous_item(resources)
while resources and retry_count < MAX_RETRY_ATTEMPTS:
time.sleep(0.1)
resources = ray.available_resources()
delete_miscellaneous_item(resources)
retry_count += 1
if retry_count >= MAX_RETRY_ATTEMPTS:
raise RuntimeError(
"Resources were available even after {} retries.".format(
MAX_RETRY_ATTEMPTS), resources)
return resources
function = ray.remote(
num_cpus=2, num_gpus=1, resources={"test_resource": 1})(test)
cluster_resources = ray.get(function.remote())
# All cluster resources should be utilized and
# cluster_resources must be empty
assert cluster_resources == {}
@pytest.fixture
def save_gpu_ids_shutdown_only():
# Record the curent value of this environment variable so that we can
# reset it after the test.
original_gpu_ids = os.environ.get("CUDA_VISIBLE_DEVICES", None)
yield None
# The code after the yield will run as teardown code.
ray.shutdown()
# Reset the environment variable.
if original_gpu_ids is not None:
os.environ["CUDA_VISIBLE_DEVICES"] = original_gpu_ids
else:
del os.environ["CUDA_VISIBLE_DEVICES"]
def test_specific_gpus(save_gpu_ids_shutdown_only):
allowed_gpu_ids = [4, 5, 6]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
[str(i) for i in allowed_gpu_ids])
ray.init(num_gpus=3)
@ray.remote(num_gpus=1)
def f():
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 1
assert int(gpu_ids[0]) in allowed_gpu_ids
@ray.remote(num_gpus=2)
def g():
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 2
assert int(gpu_ids[0]) in allowed_gpu_ids
assert int(gpu_ids[1]) in allowed_gpu_ids
ray.get([f.remote() for _ in range(100)])
ray.get([g.remote() for _ in range(100)])
def test_local_mode_gpus(save_gpu_ids_shutdown_only):
allowed_gpu_ids = [4, 5, 6, 7, 8]
os.environ["CUDA_VISIBLE_DEVICES"] = ",".join(
[str(i) for i in allowed_gpu_ids])
from importlib import reload
reload(ray.worker)
ray.init(num_gpus=3, local_mode=True)
@ray.remote
def f():
gpu_ids = ray.get_gpu_ids()
assert len(gpu_ids) == 3
for gpu in gpu_ids:
assert int(gpu) in allowed_gpu_ids
ray.get([f.remote() for _ in range(100)])
def test_blocking_tasks(ray_start_regular):
@ray.remote
def f(i, j):
return (i, j)
@ray.remote
def g(i):
# Each instance of g submits and blocks on the result of another
# remote task.
object_refs = [f.remote(i, j) for j in range(2)]
return ray.get(object_refs)
@ray.remote
def h(i):
# Each instance of g submits and blocks on the result of another
# remote task using ray.wait.
object_refs = [f.remote(i, j) for j in range(2)]
return ray.wait(object_refs, num_returns=len(object_refs))
ray.get([h.remote(i) for i in range(4)])
@ray.remote
def _sleep(i):
time.sleep(0.01)
return (i)
@ray.remote
def sleep():
# Each instance of sleep submits and blocks on the result of
# another remote task, which takes some time to execute.
ray.get([_sleep.remote(i) for i in range(10)])
ray.get(sleep.remote())
def test_max_call_tasks(ray_start_regular):
@ray.remote(max_calls=1)
def f():
return os.getpid()
pid = ray.get(f.remote())
ray.test_utils.wait_for_pid_to_exit(pid)
@ray.remote(max_calls=2)
def f():
return os.getpid()
pid1 = ray.get(f.remote())
pid2 = ray.get(f.remote())
assert pid1 == pid2
ray.test_utils.wait_for_pid_to_exit(pid1)
if __name__ == "__main__":
import pytest
sys.exit(pytest.main(["-v", __file__]))