diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 900791219..af61a9bc1 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -1186,7 +1186,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster): def test_actors_and_tasks_with_gpus(ray_start_cluster): cluster = ray_start_cluster num_nodes = 3 - num_gpus_per_raylet = 6 + num_gpus_per_raylet = 2 for i in range(num_nodes): cluster.add_node( num_cpus=num_gpus_per_raylet, num_gpus=num_gpus_per_raylet) @@ -1211,7 +1211,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): @ray.remote(num_gpus=1) def f1(): t1 = time.monotonic() - time.sleep(0.4) + time.sleep(0.1) t2 = time.monotonic() gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 1 @@ -1222,7 +1222,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): @ray.remote(num_gpus=2) def f2(): t1 = time.monotonic() - time.sleep(0.4) + time.sleep(0.1) t2 = time.monotonic() gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 2 @@ -1258,8 +1258,6 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): # Run a bunch of GPU tasks. locations_to_intervals = locations_to_intervals_for_many_tasks() - # Make sure that all GPUs were used. - assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet) # For each GPU, verify that the set of tasks that used this specific # GPU did not overlap in time. for locations in locations_to_intervals: @@ -1275,8 +1273,6 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): # Run a bunch of GPU tasks. locations_to_intervals = locations_to_intervals_for_many_tasks() - # Make sure that all but one of the GPUs were used. - assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1) # For each GPU, verify that the set of tasks that used this specific # GPU did not overlap in time. for locations in locations_to_intervals: @@ -1284,28 +1280,9 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): # Make sure that the actor's GPU was not used. assert actor_location not in locations_to_intervals - # Create several more actors that use GPUs. - actors = [Actor1.remote() for _ in range(3)] - actor_locations = ray.get( - [actor.get_location_and_ids.remote() for actor in actors]) - - # Run a bunch of GPU tasks. - locations_to_intervals = locations_to_intervals_for_many_tasks() - # Make sure that all but 11 of the GPUs were used. - assert ( - len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1 - 3) - # For each GPU, verify that the set of tasks that used this specific - # GPU did not overlap in time. - for locations in locations_to_intervals: - check_intervals_non_overlapping(locations_to_intervals[locations]) - # Make sure that the GPUs were not used. - assert actor_location not in locations_to_intervals - for location in actor_locations: - assert location not in locations_to_intervals - # Create more actors to fill up all the GPUs. more_actors = [ - Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet - 1 - 3) + Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet - 1) ] # Wait for the actors to finish being created. ray.get([actor.get_location_and_ids.remote() for actor in more_actors]) @@ -1319,38 +1296,71 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster): def test_actors_and_tasks_with_gpus_version_two(shutdown_only): # Create tasks and actors that both use GPUs and make sure that they # are given different GPUs + num_gpus = 4 + ray.init( - num_cpus=10, num_gpus=10, object_store_memory=int(150 * 1024 * 1024)) + num_cpus=(num_gpus + 1), + num_gpus=num_gpus, + object_store_memory=int(150 * 1024 * 1024)) + + # The point of this actor is to record which GPU IDs have been seen. We + # can't just return them from the tasks, because the tasks don't return + # for a long time in order to make sure the GPU is not released + # prematurely. + @ray.remote + class RecordGPUs(object): + def __init__(self): + self.gpu_ids_seen = [] + self.num_calls = 0 + + def add_ids(self, gpu_ids): + self.gpu_ids_seen += gpu_ids + self.num_calls += 1 + + def get_gpu_ids_and_calls(self): + return self.gpu_ids_seen, self.num_calls @ray.remote(num_gpus=1) - def f(): - time.sleep(5) + def f(record_gpu_actor): gpu_ids = ray.get_gpu_ids() assert len(gpu_ids) == 1 - return gpu_ids[0] + record_gpu_actor.add_ids.remote(gpu_ids) + # Sleep for a long time so that the GPU never gets released. This task + # will be killed by ray.shutdown() before it actually finishes. + time.sleep(1000) @ray.remote(num_gpus=1) class Actor(object): - def __init__(self): + def __init__(self, record_gpu_actor): self.gpu_ids = ray.get_gpu_ids() assert len(self.gpu_ids) == 1 + record_gpu_actor.add_ids.remote(self.gpu_ids) - def get_gpu_id(self): + def check_gpu_ids(self): assert ray.get_gpu_ids() == self.gpu_ids - return self.gpu_ids[0] - results = [] + record_gpu_actor = RecordGPUs.remote() + actors = [] - for _ in range(5): - results.append(f.remote()) - a = Actor.remote() - results.append(a.get_gpu_id.remote()) + actor_results = [] + for _ in range(num_gpus // 2): + f.remote(record_gpu_actor) + a = Actor.remote(record_gpu_actor) + actor_results.append(a.check_gpu_ids.remote()) # Prevent the actor handle from going out of scope so that its GPU # resources don't get released. actors.append(a) - gpu_ids = ray.get(results) - assert set(gpu_ids) == set(range(10)) + # Make sure that the actor method calls succeeded. + ray.get(actor_results) + + start_time = time.time() + while time.time() - start_time < 30: + seen_gpu_ids, num_calls = ray.get( + record_gpu_actor.get_gpu_ids_and_calls.remote()) + if num_calls == num_gpus: + break + assert set(seen_gpu_ids) == set(range(num_gpus)) def test_blocking_actor_task(shutdown_only): diff --git a/python/ray/tests/utils.py b/python/ray/tests/utils.py index ea14adfb8..ea96f6c9e 100644 --- a/python/ray/tests/utils.py +++ b/python/ray/tests/utils.py @@ -4,12 +4,13 @@ from __future__ import print_function import fnmatch import os -import psutil import subprocess import sys import tempfile import time +import psutil + import ray