diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index bb99cdbbb..627300847 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -148,6 +148,15 @@ py_test( deps = ["//:ray_lib"], ) +py_test( + name = "test_failure_direct", + size = "medium", + srcs = ["test_failure_direct.py", "test_failure.py"], + tags = ["exclusive"], + deps = ["//:ray_lib"], + flaky = 1, +) + py_test( name = "test_garbage_collection", size = "medium", diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index 3d7466e32..c6c1f0a50 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -23,6 +23,8 @@ from ray.test_utils import ( RayTestTimeoutException, ) +RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT")) + def test_failed_task(ray_start_regular): @ray.remote @@ -302,7 +304,6 @@ def test_worker_raising_exception(ray_start_regular): f.remote() wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1) - wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) def test_worker_dying(ray_start_regular): @@ -540,6 +541,7 @@ def test_export_large_objects(ray_start_regular): wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2) +@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO detect resource deadlock") def test_warning_for_resource_deadlock(shutdown_only): # Check that we get warning messages for infeasible tasks. ray.init(num_cpus=1) @@ -879,6 +881,9 @@ def test_fill_object_store_exception(ray_start_cluster_head): ray.put(np.zeros(10**8 + 2, dtype=np.uint8)) +@pytest.mark.skipif( + not RAY_FORCE_DIRECT, + reason="raylet path attempts reconstruction for evicted objects") @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, @@ -893,8 +898,6 @@ def test_direct_call_eviction(ray_start_cluster): def large_object(): return np.zeros(10 * 1024 * 1024) - large_object = large_object.options(is_direct_call=True) - obj = large_object.remote() assert (isinstance(ray.get(obj), np.ndarray)) # Evict the object. @@ -909,14 +912,15 @@ def test_direct_call_eviction(ray_start_cluster): def dependent_task(x): return - dependent_task = dependent_task.options(is_direct_call=True) - # If the object is passed by reference, the task throws an # exception. with pytest.raises(ray.exceptions.RayTaskError): ray.get(dependent_task.remote(obj)) +@pytest.mark.skipif( + not RAY_FORCE_DIRECT, + reason="raylet path attempts reconstruction for evicted objects") @pytest.mark.parametrize( "ray_start_cluster", [{ "num_nodes": 1, @@ -944,9 +948,6 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster): ray.get(obj_id) print("get done", obj_ids) - large_object = large_object.options(is_direct_call=True) - get = get.options(is_direct_call=True) - obj = large_object.remote() ray.get(get.remote([obj])) @@ -960,7 +961,7 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster): "num_cpus": 2, }], indirect=True) -def test_direct_call_serialized_id(ray_start_cluster): +def test_serialized_id(ray_start_cluster): @ray.remote def small_object(): # Sleep a bit before creating the object to force a timeout @@ -981,10 +982,6 @@ def test_direct_call_serialized_id(ray_start_cluster): else: assert ray.get(obj_id) == 1 - small_object = small_object.options(is_direct_call=True) - dependent_task = dependent_task.options(is_direct_call=True) - get = get.options(is_direct_call=True) - obj = small_object.remote() ray.get(get.remote([obj], False)) diff --git a/python/ray/tests/test_failure_direct.py b/python/ray/tests/test_failure_direct.py new file mode 100644 index 000000000..7e635d1df --- /dev/null +++ b/python/ray/tests/test_failure_direct.py @@ -0,0 +1,16 @@ +"""Wrapper script that sets RAY_FORCE_DIRECT.""" + +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import pytest +import sys +import os + +if __name__ == "__main__": + os.environ["RAY_FORCE_DIRECT"] = "1" + sys.exit( + pytest.main( + ["-v", + os.path.join(os.path.dirname(__file__), "test_failure.py")]))