Set RAY_FORCE_DIRECT=1 for run_rllib_tests, test_basic (#6171)

This commit is contained in:
Eric Liang
2019-11-25 14:12:11 -08:00
committed by GitHub
parent c9314098b9
commit 64a3a7239e
16 changed files with 229 additions and 123 deletions
+18 -2
View File
@@ -1,11 +1,19 @@
py_test(
name = "test_actor",
size = "large",
size = "medium",
srcs = ["test_actor.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_direct",
size = "medium",
srcs = ["test_actor_direct.py", "test_actor.py"],
tags = ["exclusive", "manual"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_actor_resources",
size = "large",
@@ -24,12 +32,20 @@ py_test(
py_test(
name = "test_basic",
size = "large",
size = "medium",
srcs = ["test_basic.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_basic_direct",
size = "medium",
srcs = ["test_basic_direct.py", "test_basic.py"],
tags = ["exclusive"],
deps = ["//:ray_lib"],
)
py_test(
name = "test_advanced",
size = "large",
+8
View File
@@ -18,6 +18,8 @@ import ray.test_utils
import ray.cluster_utils
from ray.test_utils import run_string_as_driver
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
def test_actor_init_error_propagated(ray_start_regular):
@ray.remote
@@ -807,6 +809,7 @@ def test_exception_raised_when_actor_node_dies(ray_start_cluster_head):
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Hanging with new GCS API.")
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet")
def test_actor_init_fails(ray_start_cluster_head):
cluster = ray_start_cluster_head
remote_node = cluster.add_node()
@@ -832,6 +835,7 @@ def test_actor_init_fails(ray_start_cluster_head):
assert results == [1 for actor in actors]
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no ft yet")
def test_reconstruction_suppression(ray_start_cluster_head):
cluster = ray_start_cluster_head
num_nodes = 5
@@ -1148,6 +1152,7 @@ def setup_queue_actor():
ray.shutdown()
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_fork(setup_queue_actor):
queue = setup_queue_actor
@@ -1166,6 +1171,7 @@ def test_fork(setup_queue_actor):
assert filtered_items == list(range(1))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_fork_consistency(setup_queue_actor):
queue = setup_queue_actor
@@ -1197,6 +1203,7 @@ def test_fork_consistency(setup_queue_actor):
assert filtered_items == list(range(num_items_per_fork))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_pickled_handle_consistency(setup_queue_actor):
queue = setup_queue_actor
@@ -1230,6 +1237,7 @@ def test_pickled_handle_consistency(setup_queue_actor):
assert filtered_items == list(range(num_items_per_fork))
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO support block/unblock")
def test_nested_fork(setup_queue_actor):
queue = setup_queue_actor
+16
View File
@@ -0,0 +1,16 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_actor.py")]))
+27
View File
@@ -5,6 +5,7 @@ from __future__ import print_function
import collections
import io
import os
import json
import logging
import re
@@ -23,6 +24,8 @@ import ray.test_utils
logger = logging.getLogger(__name__)
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
def test_simple_serialization(ray_start_regular):
primitive_objects = [
@@ -91,6 +94,7 @@ def test_simple_serialization(ray_start_regular):
assert type(obj) == type(new_obj_2)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="resource shape not implemented")
def test_fair_queueing(shutdown_only):
ray.init(
num_cpus=1, _internal_config=json.dumps({
@@ -1025,6 +1029,7 @@ def test_defining_remote_functions(shutdown_only):
assert ray.get(m.remote(1)) == 2
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented")
def test_submit_api(shutdown_only):
ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1})
@@ -1083,6 +1088,7 @@ def test_submit_api(shutdown_only):
assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2]
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="reconstruction not implemented")
def test_many_fractional_resources(shutdown_only):
ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2})
@@ -1318,6 +1324,27 @@ def test_direct_call_chain(ray_start_cluster):
assert ray.get(x) == 100
def test_direct_inline_arg_memory_corruption(ray_start_regular):
@ray.remote
def f():
return np.zeros(1000, dtype=np.uint8)
@ray.remote
class Actor(object):
def __init__(self):
self.z = []
def add(self, x):
self.z.append(x)
for prev in self.z:
assert np.sum(prev) == 0, ("memory corruption detected", prev)
a = Actor.options(is_direct_call=True).remote()
f_direct = f.options(is_direct_call=True)
for i in range(100):
ray.get(a.add.remote(f_direct.remote()))
def test_direct_actor_enabled(ray_start_regular):
@ray.remote
class Actor(object):
+16
View File
@@ -0,0 +1,16 @@
"""Wrapper script that sets RAY_FORCE_DIRECT."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import pytest
import sys
import os
if __name__ == "__main__":
os.environ["RAY_FORCE_DIRECT"] = "1"
sys.exit(
pytest.main(
["-v",
os.path.join(os.path.dirname(__file__), "test_basic.py")]))