mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 02:01:24 +08:00
Turn on direct calls for test_failure.py (#6291)
This commit is contained in:
@@ -148,6 +148,15 @@ py_test(
|
||||
deps = ["//:ray_lib"],
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_failure_direct",
|
||||
size = "medium",
|
||||
srcs = ["test_failure_direct.py", "test_failure.py"],
|
||||
tags = ["exclusive"],
|
||||
deps = ["//:ray_lib"],
|
||||
flaky = 1,
|
||||
)
|
||||
|
||||
py_test(
|
||||
name = "test_garbage_collection",
|
||||
size = "medium",
|
||||
|
||||
@@ -23,6 +23,8 @@ from ray.test_utils import (
|
||||
RayTestTimeoutException,
|
||||
)
|
||||
|
||||
RAY_FORCE_DIRECT = bool(os.environ.get("RAY_FORCE_DIRECT"))
|
||||
|
||||
|
||||
def test_failed_task(ray_start_regular):
|
||||
@ray.remote
|
||||
@@ -302,7 +304,6 @@ def test_worker_raising_exception(ray_start_regular):
|
||||
f.remote()
|
||||
|
||||
wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1)
|
||||
wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1)
|
||||
|
||||
|
||||
def test_worker_dying(ray_start_regular):
|
||||
@@ -540,6 +541,7 @@ def test_export_large_objects(ray_start_regular):
|
||||
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)
|
||||
|
||||
|
||||
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO detect resource deadlock")
|
||||
def test_warning_for_resource_deadlock(shutdown_only):
|
||||
# Check that we get warning messages for infeasible tasks.
|
||||
ray.init(num_cpus=1)
|
||||
@@ -879,6 +881,9 @@ def test_fill_object_store_exception(ray_start_cluster_head):
|
||||
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not RAY_FORCE_DIRECT,
|
||||
reason="raylet path attempts reconstruction for evicted objects")
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 1,
|
||||
@@ -893,8 +898,6 @@ def test_direct_call_eviction(ray_start_cluster):
|
||||
def large_object():
|
||||
return np.zeros(10 * 1024 * 1024)
|
||||
|
||||
large_object = large_object.options(is_direct_call=True)
|
||||
|
||||
obj = large_object.remote()
|
||||
assert (isinstance(ray.get(obj), np.ndarray))
|
||||
# Evict the object.
|
||||
@@ -909,14 +912,15 @@ def test_direct_call_eviction(ray_start_cluster):
|
||||
def dependent_task(x):
|
||||
return
|
||||
|
||||
dependent_task = dependent_task.options(is_direct_call=True)
|
||||
|
||||
# If the object is passed by reference, the task throws an
|
||||
# exception.
|
||||
with pytest.raises(ray.exceptions.RayTaskError):
|
||||
ray.get(dependent_task.remote(obj))
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not RAY_FORCE_DIRECT,
|
||||
reason="raylet path attempts reconstruction for evicted objects")
|
||||
@pytest.mark.parametrize(
|
||||
"ray_start_cluster", [{
|
||||
"num_nodes": 1,
|
||||
@@ -944,9 +948,6 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster):
|
||||
ray.get(obj_id)
|
||||
print("get done", obj_ids)
|
||||
|
||||
large_object = large_object.options(is_direct_call=True)
|
||||
get = get.options(is_direct_call=True)
|
||||
|
||||
obj = large_object.remote()
|
||||
ray.get(get.remote([obj]))
|
||||
|
||||
@@ -960,7 +961,7 @@ def test_direct_call_serialized_id_eviction(ray_start_cluster):
|
||||
"num_cpus": 2,
|
||||
}],
|
||||
indirect=True)
|
||||
def test_direct_call_serialized_id(ray_start_cluster):
|
||||
def test_serialized_id(ray_start_cluster):
|
||||
@ray.remote
|
||||
def small_object():
|
||||
# Sleep a bit before creating the object to force a timeout
|
||||
@@ -981,10 +982,6 @@ def test_direct_call_serialized_id(ray_start_cluster):
|
||||
else:
|
||||
assert ray.get(obj_id) == 1
|
||||
|
||||
small_object = small_object.options(is_direct_call=True)
|
||||
dependent_task = dependent_task.options(is_direct_call=True)
|
||||
get = get.options(is_direct_call=True)
|
||||
|
||||
obj = small_object.remote()
|
||||
ray.get(get.remote([obj], False))
|
||||
|
||||
|
||||
@@ -0,0 +1,16 @@
|
||||
"""Wrapper script that sets RAY_FORCE_DIRECT."""
|
||||
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import print_function
|
||||
|
||||
import pytest
|
||||
import sys
|
||||
import os
|
||||
|
||||
if __name__ == "__main__":
|
||||
os.environ["RAY_FORCE_DIRECT"] = "1"
|
||||
sys.exit(
|
||||
pytest.main(
|
||||
["-v",
|
||||
os.path.join(os.path.dirname(__file__), "test_failure.py")]))
|
||||
Reference in New Issue
Block a user