mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 21:11:24 +08:00
Fix flaky test_actors_and_tasks_with_gpus_version_two test. (#5756)
This commit is contained in:
@@ -1186,7 +1186,7 @@ def test_actor_multiple_gpus_from_multiple_tasks(ray_start_cluster):
|
||||
def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
cluster = ray_start_cluster
|
||||
num_nodes = 3
|
||||
num_gpus_per_raylet = 6
|
||||
num_gpus_per_raylet = 2
|
||||
for i in range(num_nodes):
|
||||
cluster.add_node(
|
||||
num_cpus=num_gpus_per_raylet, num_gpus=num_gpus_per_raylet)
|
||||
@@ -1211,7 +1211,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
@ray.remote(num_gpus=1)
|
||||
def f1():
|
||||
t1 = time.monotonic()
|
||||
time.sleep(0.4)
|
||||
time.sleep(0.1)
|
||||
t2 = time.monotonic()
|
||||
gpu_ids = ray.get_gpu_ids()
|
||||
assert len(gpu_ids) == 1
|
||||
@@ -1222,7 +1222,7 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
@ray.remote(num_gpus=2)
|
||||
def f2():
|
||||
t1 = time.monotonic()
|
||||
time.sleep(0.4)
|
||||
time.sleep(0.1)
|
||||
t2 = time.monotonic()
|
||||
gpu_ids = ray.get_gpu_ids()
|
||||
assert len(gpu_ids) == 2
|
||||
@@ -1258,8 +1258,6 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
|
||||
# Run a bunch of GPU tasks.
|
||||
locations_to_intervals = locations_to_intervals_for_many_tasks()
|
||||
# Make sure that all GPUs were used.
|
||||
assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet)
|
||||
# For each GPU, verify that the set of tasks that used this specific
|
||||
# GPU did not overlap in time.
|
||||
for locations in locations_to_intervals:
|
||||
@@ -1275,8 +1273,6 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
|
||||
# Run a bunch of GPU tasks.
|
||||
locations_to_intervals = locations_to_intervals_for_many_tasks()
|
||||
# Make sure that all but one of the GPUs were used.
|
||||
assert (len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1)
|
||||
# For each GPU, verify that the set of tasks that used this specific
|
||||
# GPU did not overlap in time.
|
||||
for locations in locations_to_intervals:
|
||||
@@ -1284,28 +1280,9 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
# Make sure that the actor's GPU was not used.
|
||||
assert actor_location not in locations_to_intervals
|
||||
|
||||
# Create several more actors that use GPUs.
|
||||
actors = [Actor1.remote() for _ in range(3)]
|
||||
actor_locations = ray.get(
|
||||
[actor.get_location_and_ids.remote() for actor in actors])
|
||||
|
||||
# Run a bunch of GPU tasks.
|
||||
locations_to_intervals = locations_to_intervals_for_many_tasks()
|
||||
# Make sure that all but 11 of the GPUs were used.
|
||||
assert (
|
||||
len(locations_to_intervals) == num_nodes * num_gpus_per_raylet - 1 - 3)
|
||||
# For each GPU, verify that the set of tasks that used this specific
|
||||
# GPU did not overlap in time.
|
||||
for locations in locations_to_intervals:
|
||||
check_intervals_non_overlapping(locations_to_intervals[locations])
|
||||
# Make sure that the GPUs were not used.
|
||||
assert actor_location not in locations_to_intervals
|
||||
for location in actor_locations:
|
||||
assert location not in locations_to_intervals
|
||||
|
||||
# Create more actors to fill up all the GPUs.
|
||||
more_actors = [
|
||||
Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet - 1 - 3)
|
||||
Actor1.remote() for _ in range(num_nodes * num_gpus_per_raylet - 1)
|
||||
]
|
||||
# Wait for the actors to finish being created.
|
||||
ray.get([actor.get_location_and_ids.remote() for actor in more_actors])
|
||||
@@ -1319,38 +1296,71 @@ def test_actors_and_tasks_with_gpus(ray_start_cluster):
|
||||
def test_actors_and_tasks_with_gpus_version_two(shutdown_only):
|
||||
# Create tasks and actors that both use GPUs and make sure that they
|
||||
# are given different GPUs
|
||||
num_gpus = 4
|
||||
|
||||
ray.init(
|
||||
num_cpus=10, num_gpus=10, object_store_memory=int(150 * 1024 * 1024))
|
||||
num_cpus=(num_gpus + 1),
|
||||
num_gpus=num_gpus,
|
||||
object_store_memory=int(150 * 1024 * 1024))
|
||||
|
||||
# The point of this actor is to record which GPU IDs have been seen. We
|
||||
# can't just return them from the tasks, because the tasks don't return
|
||||
# for a long time in order to make sure the GPU is not released
|
||||
# prematurely.
|
||||
@ray.remote
|
||||
class RecordGPUs(object):
|
||||
def __init__(self):
|
||||
self.gpu_ids_seen = []
|
||||
self.num_calls = 0
|
||||
|
||||
def add_ids(self, gpu_ids):
|
||||
self.gpu_ids_seen += gpu_ids
|
||||
self.num_calls += 1
|
||||
|
||||
def get_gpu_ids_and_calls(self):
|
||||
return self.gpu_ids_seen, self.num_calls
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
def f():
|
||||
time.sleep(5)
|
||||
def f(record_gpu_actor):
|
||||
gpu_ids = ray.get_gpu_ids()
|
||||
assert len(gpu_ids) == 1
|
||||
return gpu_ids[0]
|
||||
record_gpu_actor.add_ids.remote(gpu_ids)
|
||||
# Sleep for a long time so that the GPU never gets released. This task
|
||||
# will be killed by ray.shutdown() before it actually finishes.
|
||||
time.sleep(1000)
|
||||
|
||||
@ray.remote(num_gpus=1)
|
||||
class Actor(object):
|
||||
def __init__(self):
|
||||
def __init__(self, record_gpu_actor):
|
||||
self.gpu_ids = ray.get_gpu_ids()
|
||||
assert len(self.gpu_ids) == 1
|
||||
record_gpu_actor.add_ids.remote(self.gpu_ids)
|
||||
|
||||
def get_gpu_id(self):
|
||||
def check_gpu_ids(self):
|
||||
assert ray.get_gpu_ids() == self.gpu_ids
|
||||
return self.gpu_ids[0]
|
||||
|
||||
results = []
|
||||
record_gpu_actor = RecordGPUs.remote()
|
||||
|
||||
actors = []
|
||||
for _ in range(5):
|
||||
results.append(f.remote())
|
||||
a = Actor.remote()
|
||||
results.append(a.get_gpu_id.remote())
|
||||
actor_results = []
|
||||
for _ in range(num_gpus // 2):
|
||||
f.remote(record_gpu_actor)
|
||||
a = Actor.remote(record_gpu_actor)
|
||||
actor_results.append(a.check_gpu_ids.remote())
|
||||
# Prevent the actor handle from going out of scope so that its GPU
|
||||
# resources don't get released.
|
||||
actors.append(a)
|
||||
|
||||
gpu_ids = ray.get(results)
|
||||
assert set(gpu_ids) == set(range(10))
|
||||
# Make sure that the actor method calls succeeded.
|
||||
ray.get(actor_results)
|
||||
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < 30:
|
||||
seen_gpu_ids, num_calls = ray.get(
|
||||
record_gpu_actor.get_gpu_ids_and_calls.remote())
|
||||
if num_calls == num_gpus:
|
||||
break
|
||||
assert set(seen_gpu_ids) == set(range(num_gpus))
|
||||
|
||||
|
||||
def test_blocking_actor_task(shutdown_only):
|
||||
|
||||
@@ -4,12 +4,13 @@ from __future__ import print_function
|
||||
|
||||
import fnmatch
|
||||
import os
|
||||
import psutil
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import psutil
|
||||
|
||||
import ray
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user