mirror of
https://github.com/wassname/ray.git
synced 2026-06-30 15:17:56 +08:00
[CI] Deflake test_basics and skip test_component_failures_3 (#13801)
This commit is contained in:
@@ -23,6 +23,7 @@ py_test_module_list(
|
||||
"test_autoscaling_policy.py",
|
||||
"test_basic.py",
|
||||
"test_basic_2.py",
|
||||
"test_basic_3.py",
|
||||
"test_cancel.py",
|
||||
"test_cli.py",
|
||||
"test_component_failures_2.py",
|
||||
@@ -174,11 +175,12 @@ py_test_module_list(
|
||||
"test_advanced.py",
|
||||
"test_basic.py",
|
||||
"test_basic_2.py",
|
||||
"test_basic_3.py",
|
||||
],
|
||||
size = "medium",
|
||||
extra_srcs = SRCS,
|
||||
name_suffix = "_client_mode",
|
||||
# TODO(barakmich): py_test will support env in Bazel 4.0.0...
|
||||
# TODO(barakmich): py_test will support env in Bazel 4.0.0...
|
||||
# Until then, we can use tags.
|
||||
#env = {"RAY_CLIENT_MODE": "1"},
|
||||
tags = ["exclusive", "client_tests"],
|
||||
|
||||
@@ -9,11 +9,7 @@ import numpy as np
|
||||
import pytest
|
||||
|
||||
import ray.cluster_utils
|
||||
from ray.test_utils import (
|
||||
client_test_enabled,
|
||||
dicts_equal,
|
||||
wait_for_pid_to_exit,
|
||||
)
|
||||
from ray.test_utils import (client_test_enabled)
|
||||
|
||||
import ray
|
||||
|
||||
@@ -170,126 +166,6 @@ def test_invalid_arguments(shutdown_only):
|
||||
x = 1
|
||||
|
||||
|
||||
def test_many_fractional_resources(shutdown_only):
|
||||
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
def f(block, accepted_resources):
|
||||
true_resources = {
|
||||
resource: value[0][1]
|
||||
for resource, value in ray.get_resource_ids().items()
|
||||
}
|
||||
if block:
|
||||
ray.get(g.remote())
|
||||
return dicts_equal(true_resources, accepted_resources)
|
||||
|
||||
# Check that the resource are assigned correctly.
|
||||
result_ids = []
|
||||
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
|
||||
resource_set = {"CPU": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))
|
||||
|
||||
resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))
|
||||
|
||||
resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(
|
||||
f._remote([False, resource_set], resources={"Custom": rand1}))
|
||||
|
||||
resource_set = {
|
||||
"CPU": int(rand1 * 10000) / 10000,
|
||||
"GPU": int(rand2 * 10000) / 10000,
|
||||
"Custom": int(rand3 * 10000) / 10000
|
||||
}
|
||||
result_ids.append(
|
||||
f._remote(
|
||||
[False, resource_set],
|
||||
num_cpus=rand1,
|
||||
num_gpus=rand2,
|
||||
resources={"Custom": rand3}))
|
||||
result_ids.append(
|
||||
f._remote(
|
||||
[True, resource_set],
|
||||
num_cpus=rand1,
|
||||
num_gpus=rand2,
|
||||
resources={"Custom": rand3}))
|
||||
assert all(ray.get(result_ids))
|
||||
|
||||
# Check that the available resources at the end are the same as the
|
||||
# beginning.
|
||||
stop_time = time.time() + 10
|
||||
correct_available_resources = False
|
||||
while time.time() < stop_time:
|
||||
available_resources = ray.available_resources()
|
||||
if ("CPU" in available_resources
|
||||
and ray.available_resources()["CPU"] == 2.0
|
||||
and "GPU" in available_resources
|
||||
and ray.available_resources()["GPU"] == 2.0
|
||||
and "Custom" in available_resources
|
||||
and ray.available_resources()["Custom"] == 2.0):
|
||||
correct_available_resources = True
|
||||
break
|
||||
if not correct_available_resources:
|
||||
assert False, "Did not get correct available resources."
|
||||
|
||||
|
||||
def test_background_tasks_with_max_calls(shutdown_only):
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
time.sleep(.1)
|
||||
return 0
|
||||
|
||||
@ray.remote(max_calls=1, max_retries=0)
|
||||
def f():
|
||||
return [g.remote()]
|
||||
|
||||
nested = ray.get([f.remote() for _ in range(10)])
|
||||
|
||||
# Should still be able to retrieve these objects, since f's workers will
|
||||
# wait for g to finish before exiting.
|
||||
ray.get([x[0] for x in nested])
|
||||
|
||||
@ray.remote(max_calls=1, max_retries=0)
|
||||
def f():
|
||||
return os.getpid(), g.remote()
|
||||
|
||||
nested = ray.get([f.remote() for _ in range(10)])
|
||||
while nested:
|
||||
pid, g_id = nested.pop(0)
|
||||
ray.get(g_id)
|
||||
del g_id
|
||||
wait_for_pid_to_exit(pid)
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||
def test_fair_queueing(shutdown_only):
|
||||
ray.init(num_cpus=1, _system_config={"fair_queueing_enabled": 1})
|
||||
|
||||
@ray.remote
|
||||
def h():
|
||||
return 0
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return ray.get(h.remote())
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return ray.get(g.remote())
|
||||
|
||||
# This will never finish without fair queueing of {f, g, h}:
|
||||
# https://github.com/ray-project/ray/issues/3644
|
||||
ready, _ = ray.wait(
|
||||
[f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000)
|
||||
assert len(ready) == 1000, len(ready)
|
||||
|
||||
|
||||
def test_put_get(shutdown_only):
|
||||
ray.init(num_cpus=0)
|
||||
|
||||
|
||||
@@ -0,0 +1,142 @@
|
||||
# coding: utf-8
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
import ray.cluster_utils
|
||||
from ray.test_utils import (
|
||||
dicts_equal,
|
||||
wait_for_pid_to_exit,
|
||||
)
|
||||
|
||||
import ray
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def test_many_fractional_resources(shutdown_only):
|
||||
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return 1
|
||||
|
||||
@ray.remote
|
||||
def f(block, accepted_resources):
|
||||
true_resources = {
|
||||
resource: value[0][1]
|
||||
for resource, value in ray.get_resource_ids().items()
|
||||
}
|
||||
if block:
|
||||
ray.get(g.remote())
|
||||
return dicts_equal(true_resources, accepted_resources)
|
||||
|
||||
# Check that the resource are assigned correctly.
|
||||
result_ids = []
|
||||
for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)):
|
||||
resource_set = {"CPU": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(f._remote([False, resource_set], num_cpus=rand1))
|
||||
|
||||
resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(f._remote([False, resource_set], num_gpus=rand1))
|
||||
|
||||
resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000}
|
||||
result_ids.append(
|
||||
f._remote([False, resource_set], resources={"Custom": rand1}))
|
||||
|
||||
resource_set = {
|
||||
"CPU": int(rand1 * 10000) / 10000,
|
||||
"GPU": int(rand2 * 10000) / 10000,
|
||||
"Custom": int(rand3 * 10000) / 10000
|
||||
}
|
||||
result_ids.append(
|
||||
f._remote(
|
||||
[False, resource_set],
|
||||
num_cpus=rand1,
|
||||
num_gpus=rand2,
|
||||
resources={"Custom": rand3}))
|
||||
result_ids.append(
|
||||
f._remote(
|
||||
[True, resource_set],
|
||||
num_cpus=rand1,
|
||||
num_gpus=rand2,
|
||||
resources={"Custom": rand3}))
|
||||
assert all(ray.get(result_ids))
|
||||
|
||||
# Check that the available resources at the end are the same as the
|
||||
# beginning.
|
||||
stop_time = time.time() + 10
|
||||
correct_available_resources = False
|
||||
while time.time() < stop_time:
|
||||
available_resources = ray.available_resources()
|
||||
if ("CPU" in available_resources
|
||||
and ray.available_resources()["CPU"] == 2.0
|
||||
and "GPU" in available_resources
|
||||
and ray.available_resources()["GPU"] == 2.0
|
||||
and "Custom" in available_resources
|
||||
and ray.available_resources()["Custom"] == 2.0):
|
||||
correct_available_resources = True
|
||||
break
|
||||
if not correct_available_resources:
|
||||
assert False, "Did not get correct available resources."
|
||||
|
||||
|
||||
def test_background_tasks_with_max_calls(shutdown_only):
|
||||
ray.init(num_cpus=2)
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
time.sleep(.1)
|
||||
return 0
|
||||
|
||||
@ray.remote(max_calls=1, max_retries=0)
|
||||
def f():
|
||||
return [g.remote()]
|
||||
|
||||
nested = ray.get([f.remote() for _ in range(10)])
|
||||
|
||||
# Should still be able to retrieve these objects, since f's workers will
|
||||
# wait for g to finish before exiting.
|
||||
ray.get([x[0] for x in nested])
|
||||
|
||||
@ray.remote(max_calls=1, max_retries=0)
|
||||
def f():
|
||||
return os.getpid(), g.remote()
|
||||
|
||||
nested = ray.get([f.remote() for _ in range(10)])
|
||||
while nested:
|
||||
pid, g_id = nested.pop(0)
|
||||
ray.get(g_id)
|
||||
del g_id
|
||||
wait_for_pid_to_exit(pid)
|
||||
|
||||
|
||||
@pytest.mark.skipif(sys.platform == "win32", reason="Failing on Windows.")
|
||||
def test_fair_queueing(shutdown_only):
|
||||
ray.init(num_cpus=1, _system_config={"fair_queueing_enabled": 1})
|
||||
|
||||
@ray.remote
|
||||
def h():
|
||||
return 0
|
||||
|
||||
@ray.remote
|
||||
def g():
|
||||
return ray.get(h.remote())
|
||||
|
||||
@ray.remote
|
||||
def f():
|
||||
return ray.get(g.remote())
|
||||
|
||||
# This will never finish without fair queueing of {f, g, h}:
|
||||
# https://github.com/ray-project/ray/issues/3644
|
||||
ready, _ = ray.wait(
|
||||
[f.remote() for _ in range(1000)], timeout=60.0, num_returns=1000)
|
||||
assert len(ready) == 1000, len(ready)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(pytest.main(["-v", __file__]))
|
||||
Reference in New Issue
Block a user