mirror of
https://github.com/wassname/ray.git
synced 2026-06-28 01:46:10 +08:00
269e1f0b98
Co-authored-by: 刘宝 <po.lb@antfin.com>
1231 lines
38 KiB
Python
1231 lines
38 KiB
Python
import logging
|
|
import os
|
|
import sys
|
|
import tempfile
|
|
import threading
|
|
import time
|
|
|
|
import numpy as np
|
|
import pytest
|
|
import redis
|
|
|
|
import ray
|
|
import ray.utils
|
|
import ray.ray_constants as ray_constants
|
|
from ray.exceptions import RayTaskError
|
|
from ray.cluster_utils import Cluster
|
|
from ray.test_utils import (
|
|
wait_for_condition,
|
|
SignalActor,
|
|
init_error_pubsub,
|
|
get_error_message,
|
|
)
|
|
|
|
|
|
def test_failed_task(ray_start_regular, error_pubsub):
|
|
@ray.remote
|
|
def throw_exception_fct1():
|
|
raise Exception("Test function 1 intentionally failed.")
|
|
|
|
@ray.remote
|
|
def throw_exception_fct2():
|
|
raise Exception("Test function 2 intentionally failed.")
|
|
|
|
@ray.remote(num_returns=3)
|
|
def throw_exception_fct3(x):
|
|
raise Exception("Test function 3 intentionally failed.")
|
|
|
|
p = error_pubsub
|
|
|
|
throw_exception_fct1.remote()
|
|
throw_exception_fct1.remote()
|
|
|
|
msgs = get_error_message(p, 2, ray_constants.TASK_PUSH_ERROR)
|
|
assert len(msgs) == 2
|
|
for msg in msgs:
|
|
assert "Test function 1 intentionally failed." in msg.error_message
|
|
|
|
x = throw_exception_fct2.remote()
|
|
try:
|
|
ray.get(x)
|
|
except Exception as e:
|
|
assert "Test function 2 intentionally failed." in str(e)
|
|
else:
|
|
# ray.get should throw an exception.
|
|
assert False
|
|
|
|
x, y, z = throw_exception_fct3.remote(1.0)
|
|
for ref in [x, y, z]:
|
|
try:
|
|
ray.get(ref)
|
|
except Exception as e:
|
|
assert "Test function 3 intentionally failed." in str(e)
|
|
else:
|
|
# ray.get should throw an exception.
|
|
assert False
|
|
|
|
class CustomException(ValueError):
|
|
pass
|
|
|
|
@ray.remote
|
|
def f():
|
|
raise CustomException("This function failed.")
|
|
|
|
try:
|
|
ray.get(f.remote())
|
|
except Exception as e:
|
|
assert "This function failed." in str(e)
|
|
assert isinstance(e, CustomException)
|
|
assert isinstance(e, ray.exceptions.RayTaskError)
|
|
assert "RayTaskError(CustomException)" in repr(e)
|
|
else:
|
|
# ray.get should throw an exception.
|
|
assert False
|
|
|
|
|
|
def test_push_error_to_driver_through_redis(ray_start_regular, error_pubsub):
|
|
address_info = ray_start_regular
|
|
address = address_info["redis_address"]
|
|
redis_client = ray.services.create_redis_client(
|
|
address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD)
|
|
error_message = "Test error message"
|
|
ray.utils.push_error_to_driver_through_redis(
|
|
redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, error_message)
|
|
errors = get_error_message(error_pubsub, 1,
|
|
ray_constants.DASHBOARD_AGENT_DIED_ERROR)
|
|
assert errors[0].type == ray_constants.DASHBOARD_AGENT_DIED_ERROR
|
|
assert errors[0].error_message == error_message
|
|
|
|
|
|
def test_get_throws_quickly_when_found_exception(ray_start_regular):
|
|
# We use an actor instead of functions here. If we use functions, it's
|
|
# very likely that two normal tasks are submitted before the first worker
|
|
# is registered to Raylet. Since `maximum_startup_concurrency` is 1,
|
|
# the worker pool will wait for the registration of the first worker
|
|
# and skip starting new workers. The result is, the two tasks will be
|
|
# executed sequentially, which breaks an assumption of this test case -
|
|
# the two tasks run in parallel.
|
|
@ray.remote
|
|
class Actor(object):
|
|
def bad_func1(self):
|
|
raise Exception("Test function intentionally failed.")
|
|
|
|
def bad_func2(self):
|
|
os._exit(0)
|
|
|
|
def slow_func(self, signal):
|
|
ray.get(signal.wait.remote())
|
|
|
|
def expect_exception(objects, exception):
|
|
with pytest.raises(ray.exceptions.RayError) as err:
|
|
ray.get(objects)
|
|
assert err.type is exception
|
|
|
|
signal1 = SignalActor.remote()
|
|
actor = Actor.options(max_concurrency=2).remote()
|
|
expect_exception(
|
|
[actor.bad_func1.remote(),
|
|
actor.slow_func.remote(signal1)], ray.exceptions.RayTaskError)
|
|
ray.get(signal1.send.remote())
|
|
|
|
signal2 = SignalActor.remote()
|
|
actor = Actor.options(max_concurrency=2).remote()
|
|
expect_exception(
|
|
[actor.bad_func2.remote(),
|
|
actor.slow_func.remote(signal2)], ray.exceptions.RayActorError)
|
|
ray.get(signal2.send.remote())
|
|
|
|
|
|
def test_fail_importing_remote_function(ray_start_2_cpus, error_pubsub):
|
|
p = error_pubsub
|
|
# Create the contents of a temporary Python file.
|
|
temporary_python_file = """
|
|
def temporary_helper_function():
|
|
return 1
|
|
"""
|
|
|
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
|
f.write(temporary_python_file.encode("ascii"))
|
|
f.flush()
|
|
directory = os.path.dirname(f.name)
|
|
# Get the module name and strip ".py" from the end.
|
|
module_name = os.path.basename(f.name)[:-3]
|
|
sys.path.append(directory)
|
|
module = __import__(module_name)
|
|
|
|
# Define a function that closes over this temporary module. This should
|
|
# fail when it is unpickled.
|
|
@ray.remote
|
|
def g(x, y=3):
|
|
try:
|
|
module.temporary_python_file()
|
|
except Exception:
|
|
# This test is not concerned with the error from running this
|
|
# function. Only from unpickling the remote function.
|
|
pass
|
|
|
|
# Invoke the function so that the definition is exported.
|
|
g.remote(1, y=2)
|
|
|
|
errors = get_error_message(
|
|
p, 2, ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR)
|
|
assert errors[0].type == ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR
|
|
assert "No module named" in errors[0].error_message
|
|
assert "No module named" in errors[1].error_message
|
|
|
|
# Check that if we try to call the function it throws an exception and
|
|
# does not hang.
|
|
for _ in range(10):
|
|
with pytest.raises(
|
|
Exception, match="This function was not imported properly."):
|
|
ray.get(g.remote(1, y=2))
|
|
|
|
f.close()
|
|
# Clean up the junk we added to sys.path.
|
|
sys.path.pop(-1)
|
|
|
|
|
|
def test_failed_function_to_run(ray_start_2_cpus, error_pubsub):
|
|
p = error_pubsub
|
|
|
|
def f(worker):
|
|
if ray.worker.global_worker.mode == ray.WORKER_MODE:
|
|
raise Exception("Function to run failed.")
|
|
|
|
ray.worker.global_worker.run_function_on_all_workers(f)
|
|
# Check that the error message is in the task info.
|
|
errors = get_error_message(p, 2, ray_constants.FUNCTION_TO_RUN_PUSH_ERROR)
|
|
assert len(errors) == 2
|
|
assert errors[0].type == ray_constants.FUNCTION_TO_RUN_PUSH_ERROR
|
|
assert "Function to run failed." in errors[0].error_message
|
|
assert "Function to run failed." in errors[1].error_message
|
|
|
|
|
|
def test_fail_importing_actor(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
# Create the contents of a temporary Python file.
|
|
temporary_python_file = """
|
|
def temporary_helper_function():
|
|
return 1
|
|
"""
|
|
|
|
f = tempfile.NamedTemporaryFile(suffix=".py")
|
|
f.write(temporary_python_file.encode("ascii"))
|
|
f.flush()
|
|
directory = os.path.dirname(f.name)
|
|
# Get the module name and strip ".py" from the end.
|
|
module_name = os.path.basename(f.name)[:-3]
|
|
sys.path.append(directory)
|
|
module = __import__(module_name)
|
|
|
|
# Define an actor that closes over this temporary module. This should
|
|
# fail when it is unpickled.
|
|
@ray.remote
|
|
class Foo:
|
|
def __init__(self, arg1, arg2=3):
|
|
self.x = module.temporary_python_file()
|
|
|
|
def get_val(self, arg1, arg2=3):
|
|
return 1
|
|
|
|
# There should be no errors yet.
|
|
errors = get_error_message(p, 2)
|
|
assert len(errors) == 0
|
|
# Create an actor.
|
|
foo = Foo.remote(3, arg2=0)
|
|
|
|
errors = get_error_message(p, 2)
|
|
assert len(errors) == 2
|
|
|
|
for error in errors:
|
|
# Wait for the error to arrive.
|
|
if error.type == ray_constants.REGISTER_ACTOR_PUSH_ERROR:
|
|
assert "No module named" in error.error_message
|
|
else:
|
|
# Wait for the error from when the __init__ tries to run.
|
|
assert ("failed to be imported, and so cannot execute this method"
|
|
in error.error_message)
|
|
|
|
# Check that if we try to get the function it throws an exception and
|
|
# does not hang.
|
|
with pytest.raises(Exception, match="failed to be imported"):
|
|
ray.get(foo.get_val.remote(1, arg2=2))
|
|
|
|
# Wait for the error from when the call to get_val.
|
|
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
|
|
assert ("failed to be imported, and so cannot execute this method" in
|
|
errors[0].error_message)
|
|
|
|
f.close()
|
|
|
|
# Clean up the junk we added to sys.path.
|
|
sys.path.pop(-1)
|
|
|
|
|
|
def test_failed_actor_init(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
error_message1 = "actor constructor failed"
|
|
error_message2 = "actor method failed"
|
|
|
|
@ray.remote
|
|
class FailedActor:
|
|
def __init__(self):
|
|
raise Exception(error_message1)
|
|
|
|
def fail_method(self):
|
|
raise Exception(error_message2)
|
|
|
|
a = FailedActor.remote()
|
|
|
|
# Make sure that we get errors from a failed constructor.
|
|
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
|
|
assert error_message1 in errors[0].error_message
|
|
|
|
# Make sure that we get errors from a failed method.
|
|
a.fail_method.remote()
|
|
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
|
|
assert error_message1 in errors[0].error_message
|
|
|
|
|
|
def test_failed_actor_method(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
error_message2 = "actor method failed"
|
|
|
|
@ray.remote
|
|
class FailedActor:
|
|
def __init__(self):
|
|
pass
|
|
|
|
def fail_method(self):
|
|
raise Exception(error_message2)
|
|
|
|
a = FailedActor.remote()
|
|
|
|
# Make sure that we get errors from a failed method.
|
|
a.fail_method.remote()
|
|
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
|
|
assert error_message2 in errors[0].error_message
|
|
|
|
|
|
def test_incorrect_method_calls(ray_start_regular):
|
|
@ray.remote
|
|
class Actor:
|
|
def __init__(self, missing_variable_name):
|
|
pass
|
|
|
|
def get_val(self, x):
|
|
pass
|
|
|
|
# Make sure that we get errors if we call the constructor incorrectly.
|
|
|
|
# Create an actor with too few arguments.
|
|
with pytest.raises(Exception):
|
|
a = Actor.remote()
|
|
|
|
# Create an actor with too many arguments.
|
|
with pytest.raises(Exception):
|
|
a = Actor.remote(1, 2)
|
|
|
|
# Create an actor the correct number of arguments.
|
|
a = Actor.remote(1)
|
|
|
|
# Call a method with too few arguments.
|
|
with pytest.raises(Exception):
|
|
a.get_val.remote()
|
|
|
|
# Call a method with too many arguments.
|
|
with pytest.raises(Exception):
|
|
a.get_val.remote(1, 2)
|
|
# Call a method that doesn't exist.
|
|
with pytest.raises(AttributeError):
|
|
a.nonexistent_method()
|
|
with pytest.raises(AttributeError):
|
|
a.nonexistent_method.remote()
|
|
|
|
|
|
def test_worker_raising_exception(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
|
|
@ray.remote(max_calls=2)
|
|
def f():
|
|
# This is the only reasonable variable we can set here that makes the
|
|
# execute_task function fail after the task got executed.
|
|
worker = ray.worker.global_worker
|
|
worker.function_actor_manager.increase_task_counter = None
|
|
|
|
# Running this task should cause the worker to raise an exception after
|
|
# the task has successfully completed.
|
|
f.remote()
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_CRASH_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_CRASH_PUSH_ERROR
|
|
|
|
|
|
def test_worker_dying(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
# Define a remote function that will kill the worker that runs it.
|
|
|
|
@ray.remote(max_retries=0)
|
|
def f():
|
|
eval("exit()")
|
|
|
|
with pytest.raises(ray.exceptions.WorkerCrashedError):
|
|
ray.get(f.remote())
|
|
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
|
|
assert "died or was killed while executing" in errors[0].error_message
|
|
|
|
|
|
def test_actor_worker_dying(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
|
|
@ray.remote
|
|
class Actor:
|
|
def kill(self):
|
|
eval("exit()")
|
|
|
|
@ray.remote
|
|
def consume(x):
|
|
pass
|
|
|
|
a = Actor.remote()
|
|
[obj], _ = ray.wait([a.kill.remote()], timeout=5)
|
|
with pytest.raises(ray.exceptions.RayActorError):
|
|
ray.get(obj)
|
|
with pytest.raises(ray.exceptions.RayTaskError):
|
|
ray.get(consume.remote(obj))
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
|
|
|
|
|
|
def test_actor_worker_dying_future_tasks(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
|
|
@ray.remote(max_restarts=0)
|
|
class Actor:
|
|
def getpid(self):
|
|
return os.getpid()
|
|
|
|
def sleep(self):
|
|
time.sleep(1)
|
|
|
|
a = Actor.remote()
|
|
pid = ray.get(a.getpid.remote())
|
|
tasks1 = [a.sleep.remote() for _ in range(10)]
|
|
os.kill(pid, 9)
|
|
time.sleep(0.1)
|
|
tasks2 = [a.sleep.remote() for _ in range(10)]
|
|
for obj in tasks1 + tasks2:
|
|
with pytest.raises(Exception):
|
|
ray.get(obj)
|
|
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_DIED_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_DIED_PUSH_ERROR
|
|
|
|
|
|
def test_actor_worker_dying_nothing_in_progress(ray_start_regular):
|
|
@ray.remote(max_restarts=0)
|
|
class Actor:
|
|
def getpid(self):
|
|
return os.getpid()
|
|
|
|
a = Actor.remote()
|
|
pid = ray.get(a.getpid.remote())
|
|
os.kill(pid, 9)
|
|
time.sleep(0.1)
|
|
task2 = a.getpid.remote()
|
|
with pytest.raises(Exception):
|
|
ray.get(task2)
|
|
|
|
|
|
def test_actor_scope_or_intentionally_killed_message(ray_start_regular,
|
|
error_pubsub):
|
|
p = error_pubsub
|
|
|
|
@ray.remote
|
|
class Actor:
|
|
pass
|
|
|
|
a = Actor.remote()
|
|
a = Actor.remote()
|
|
a.__ray_terminate__.remote()
|
|
time.sleep(1)
|
|
errors = get_error_message(p, 1)
|
|
assert len(errors) == 0, "Should not have propogated an error - {}".format(
|
|
errors)
|
|
|
|
|
|
def test_exception_chain(ray_start_regular):
|
|
@ray.remote
|
|
def bar():
|
|
return 1 / 0
|
|
|
|
@ray.remote
|
|
def foo():
|
|
return ray.get(bar.remote())
|
|
|
|
r = foo.remote()
|
|
try:
|
|
ray.get(r)
|
|
except ZeroDivisionError as ex:
|
|
assert isinstance(ex, RayTaskError)
|
|
|
|
|
|
@pytest.mark.skip("This test does not work yet.")
|
|
@pytest.mark.parametrize(
|
|
"ray_start_object_store_memory", [10**6], indirect=True)
|
|
def test_put_error1(ray_start_object_store_memory, error_pubsub):
|
|
p = error_pubsub
|
|
num_objects = 3
|
|
object_size = 4 * 10**5
|
|
|
|
# Define a task with a single dependency, a numpy array, that returns
|
|
# another array.
|
|
@ray.remote
|
|
def single_dependency(i, arg):
|
|
arg = np.copy(arg)
|
|
arg[0] = i
|
|
return arg
|
|
|
|
@ray.remote
|
|
def put_arg_task():
|
|
# Launch num_objects instances of the remote task, each dependent
|
|
# on the one before it. The result of the first task should get
|
|
# evicted.
|
|
args = []
|
|
arg = single_dependency.remote(0, np.zeros(
|
|
object_size, dtype=np.uint8))
|
|
for i in range(num_objects):
|
|
arg = single_dependency.remote(i, arg)
|
|
args.append(arg)
|
|
|
|
# Get the last value to force all tasks to finish.
|
|
value = ray.get(args[-1])
|
|
assert value[0] == i
|
|
|
|
# Get the first value (which should have been evicted) to force
|
|
# reconstruction. Currently, since we're not able to reconstruct
|
|
# `ray.put` objects that were evicted and whose originating tasks
|
|
# are still running, this for-loop should hang and push an error to
|
|
# the driver.
|
|
ray.get(args[0])
|
|
|
|
put_arg_task.remote()
|
|
|
|
# Make sure we receive the correct error message.
|
|
errors = get_error_message(p, 1,
|
|
ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR
|
|
|
|
|
|
@pytest.mark.skip("This test does not work yet.")
|
|
@pytest.mark.parametrize(
|
|
"ray_start_object_store_memory", [10**6], indirect=True)
|
|
def test_put_error2(ray_start_object_store_memory):
|
|
# This is the same as the previous test, but it calls ray.put directly.
|
|
num_objects = 3
|
|
object_size = 4 * 10**5
|
|
|
|
# Define a task with a single dependency, a numpy array, that returns
|
|
# another array.
|
|
@ray.remote
|
|
def single_dependency(i, arg):
|
|
arg = np.copy(arg)
|
|
arg[0] = i
|
|
return arg
|
|
|
|
@ray.remote
|
|
def put_task():
|
|
# Launch num_objects instances of the remote task, each dependent
|
|
# on the one before it. The result of the first task should get
|
|
# evicted.
|
|
args = []
|
|
arg = ray.put(np.zeros(object_size, dtype=np.uint8))
|
|
for i in range(num_objects):
|
|
arg = single_dependency.remote(i, arg)
|
|
args.append(arg)
|
|
|
|
# Get the last value to force all tasks to finish.
|
|
value = ray.get(args[-1])
|
|
assert value[0] == i
|
|
|
|
# Get the first value (which should have been evicted) to force
|
|
# reconstruction. Currently, since we're not able to reconstruct
|
|
# `ray.put` objects that were evicted and whose originating tasks
|
|
# are still running, this for-loop should hang and push an error to
|
|
# the driver.
|
|
ray.get(args[0])
|
|
|
|
put_task.remote()
|
|
|
|
# Make sure we receive the correct error message.
|
|
# get_error_message(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1)
|
|
|
|
|
|
@pytest.mark.skip("Publish happeds before we subscribe it")
|
|
def test_version_mismatch(error_pubsub, shutdown_only):
|
|
ray_version = ray.__version__
|
|
ray.__version__ = "fake ray version"
|
|
|
|
ray.init(num_cpus=1)
|
|
p = error_pubsub
|
|
|
|
errors = get_error_message(p, 1, ray_constants.VERSION_MISMATCH_PUSH_ERROR)
|
|
assert False, errors
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.VERSION_MISMATCH_PUSH_ERROR
|
|
|
|
# Reset the version.
|
|
ray.__version__ = ray_version
|
|
|
|
|
|
def test_export_large_objects(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
import ray.ray_constants as ray_constants
|
|
|
|
large_object = np.zeros(2 * ray_constants.PICKLE_OBJECT_WARNING_SIZE)
|
|
|
|
@ray.remote
|
|
def f():
|
|
large_object
|
|
|
|
# Invoke the function so that the definition is exported.
|
|
f.remote()
|
|
|
|
# Make sure that a warning is generated.
|
|
errors = get_error_message(p, 1,
|
|
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR
|
|
|
|
@ray.remote
|
|
class Foo:
|
|
def __init__(self):
|
|
large_object
|
|
|
|
Foo.remote()
|
|
|
|
# Make sure that a warning is generated.
|
|
errors = get_error_message(p, 1,
|
|
ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR
|
|
|
|
|
|
@pytest.mark.skip(reason="TODO detect resource deadlock")
|
|
def test_warning_for_resource_deadlock(error_pubsub, shutdown_only):
|
|
p = error_pubsub
|
|
# Check that we get warning messages for infeasible tasks.
|
|
ray.init(num_cpus=1)
|
|
|
|
@ray.remote(num_cpus=1)
|
|
class Foo:
|
|
def f(self):
|
|
return 0
|
|
|
|
@ray.remote
|
|
def f():
|
|
# Creating both actors is not possible.
|
|
actors = [Foo.remote() for _ in range(2)]
|
|
for a in actors:
|
|
ray.get(a.f.remote())
|
|
|
|
# Run in a task to check we handle the blocked task case correctly
|
|
f.remote()
|
|
errors = get_error_message(p, 1, ray_constants.RESOURCE_DEADLOCK_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.RESOURCE_DEADLOCK_ERROR
|
|
|
|
|
|
def test_warning_for_infeasible_tasks(ray_start_regular, error_pubsub):
|
|
p = error_pubsub
|
|
# Check that we get warning messages for infeasible tasks.
|
|
|
|
@ray.remote(num_gpus=1)
|
|
def f():
|
|
pass
|
|
|
|
@ray.remote(resources={"Custom": 1})
|
|
class Foo:
|
|
pass
|
|
|
|
# This task is infeasible.
|
|
f.remote()
|
|
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
|
|
|
|
# This actor placement task is infeasible.
|
|
Foo.remote()
|
|
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
|
|
|
|
|
|
def test_warning_for_infeasible_zero_cpu_actor(shutdown_only):
|
|
# Check that we cannot place an actor on a 0 CPU machine and that we get an
|
|
# infeasibility warning (even though the actor creation task itself
|
|
# requires no CPUs).
|
|
|
|
ray.init(num_cpus=0)
|
|
p = init_error_pubsub()
|
|
|
|
@ray.remote
|
|
class Foo:
|
|
pass
|
|
|
|
# The actor creation should be infeasible.
|
|
Foo.remote()
|
|
errors = get_error_message(p, 1, ray_constants.INFEASIBLE_TASK_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.INFEASIBLE_TASK_ERROR
|
|
p.close()
|
|
|
|
|
|
def test_warning_for_too_many_actors(shutdown_only):
|
|
# Check that if we run a workload which requires too many workers to be
|
|
# started that we will receive a warning.
|
|
num_cpus = 2
|
|
ray.init(num_cpus=num_cpus)
|
|
|
|
p = init_error_pubsub()
|
|
|
|
@ray.remote
|
|
class Foo:
|
|
def __init__(self):
|
|
time.sleep(1000)
|
|
|
|
[Foo.remote() for _ in range(num_cpus * 3)]
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
|
|
|
|
[Foo.remote() for _ in range(num_cpus)]
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
|
|
p.close()
|
|
|
|
|
|
def test_warning_for_too_many_nested_tasks(shutdown_only):
|
|
# Check that if we run a workload which requires too many workers to be
|
|
# started that we will receive a warning.
|
|
num_cpus = 2
|
|
ray.init(num_cpus=num_cpus)
|
|
p = init_error_pubsub()
|
|
|
|
@ray.remote
|
|
def f():
|
|
time.sleep(1000)
|
|
return 1
|
|
|
|
@ray.remote
|
|
def h():
|
|
time.sleep(1)
|
|
ray.get(f.remote())
|
|
|
|
@ray.remote
|
|
def g():
|
|
# Sleep so that the f tasks all get submitted to the scheduler after
|
|
# the g tasks.
|
|
time.sleep(1)
|
|
ray.get(h.remote())
|
|
|
|
[g.remote() for _ in range(num_cpus * 6)]
|
|
errors = get_error_message(p, 1, ray_constants.WORKER_POOL_LARGE_ERROR)
|
|
assert len(errors) == 1
|
|
assert errors[0].type == ray_constants.WORKER_POOL_LARGE_ERROR
|
|
p.close()
|
|
|
|
|
|
def test_warning_for_many_duplicate_remote_functions_and_actors(shutdown_only):
|
|
ray.init(num_cpus=1)
|
|
|
|
@ray.remote
|
|
def create_remote_function():
|
|
@ray.remote
|
|
def g():
|
|
return 1
|
|
|
|
return ray.get(g.remote())
|
|
|
|
for _ in range(ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD - 1):
|
|
ray.get(create_remote_function.remote())
|
|
|
|
import io
|
|
log_capture_string = io.StringIO()
|
|
ch = logging.StreamHandler(log_capture_string)
|
|
|
|
# TODO(rkn): It's terrible to have to rely on this implementation detail,
|
|
# the fact that the warning comes from ray.import_thread.logger. However,
|
|
# I didn't find a good way to capture the output for all loggers
|
|
# simultaneously.
|
|
ray.import_thread.logger.addHandler(ch)
|
|
|
|
ray.get(create_remote_function.remote())
|
|
|
|
start_time = time.time()
|
|
while time.time() < start_time + 10:
|
|
log_contents = log_capture_string.getvalue()
|
|
if len(log_contents) > 0:
|
|
break
|
|
|
|
ray.import_thread.logger.removeHandler(ch)
|
|
|
|
assert "remote function" in log_contents
|
|
assert "has been exported {} times.".format(
|
|
ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD) in log_contents
|
|
|
|
# Now test the same thing but for actors.
|
|
|
|
@ray.remote
|
|
def create_actor_class():
|
|
# Require a GPU so that the actor is never actually created and we
|
|
# don't spawn an unreasonable number of processes.
|
|
@ray.remote(num_gpus=1)
|
|
class Foo:
|
|
pass
|
|
|
|
Foo.remote()
|
|
|
|
for _ in range(ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD - 1):
|
|
ray.get(create_actor_class.remote())
|
|
|
|
log_capture_string = io.StringIO()
|
|
ch = logging.StreamHandler(log_capture_string)
|
|
|
|
# TODO(rkn): As mentioned above, it's terrible to have to rely on this
|
|
# implementation detail.
|
|
ray.import_thread.logger.addHandler(ch)
|
|
|
|
ray.get(create_actor_class.remote())
|
|
|
|
start_time = time.time()
|
|
while time.time() < start_time + 10:
|
|
log_contents = log_capture_string.getvalue()
|
|
if len(log_contents) > 0:
|
|
break
|
|
|
|
ray.import_thread.logger.removeHandler(ch)
|
|
|
|
assert "actor" in log_contents
|
|
assert "has been exported {} times.".format(
|
|
ray_constants.DUPLICATE_REMOTE_FUNCTION_THRESHOLD) in log_contents
|
|
|
|
|
|
def test_redis_module_failure(ray_start_regular):
|
|
address_info = ray_start_regular
|
|
address = address_info["redis_address"]
|
|
address = address.split(":")
|
|
assert len(address) == 2
|
|
|
|
def run_failure_test(expecting_message, *command):
|
|
with pytest.raises(
|
|
Exception, match=".*{}.*".format(expecting_message)):
|
|
client = redis.StrictRedis(
|
|
host=address[0],
|
|
port=int(address[1]),
|
|
password=ray_constants.REDIS_DEFAULT_PASSWORD)
|
|
client.execute_command(*command)
|
|
|
|
def run_one_command(*command):
|
|
client = redis.StrictRedis(
|
|
host=address[0],
|
|
port=int(address[1]),
|
|
password=ray_constants.REDIS_DEFAULT_PASSWORD)
|
|
client.execute_command(*command)
|
|
|
|
run_failure_test("wrong number of arguments", "RAY.TABLE_ADD", 13)
|
|
run_failure_test("Prefix must be in the TablePrefix range",
|
|
"RAY.TABLE_ADD", 100000, 1, 1, 1)
|
|
run_failure_test("Prefix must be in the TablePrefix range",
|
|
"RAY.TABLE_REQUEST_NOTIFICATIONS", 100000, 1, 1, 1)
|
|
run_failure_test("Prefix must be a valid TablePrefix integer",
|
|
"RAY.TABLE_ADD", b"a", 1, 1, 1)
|
|
run_failure_test("Pubsub channel must be in the TablePubsub range",
|
|
"RAY.TABLE_ADD", 1, 10000, 1, 1)
|
|
run_failure_test("Pubsub channel must be a valid integer", "RAY.TABLE_ADD",
|
|
1, b"a", 1, 1)
|
|
# Change the key from 1 to 2, since the previous command should have
|
|
# succeeded at writing the key, but not publishing it.
|
|
run_failure_test("Index is less than 0.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
|
|
-1)
|
|
run_failure_test("Index is not a number.", "RAY.TABLE_APPEND", 1, 1, 2, 1,
|
|
b"a")
|
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
|
# It's okay to add duplicate entries.
|
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1)
|
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 0)
|
|
run_one_command("RAY.TABLE_APPEND", 1, 1, 2, 1, 1)
|
|
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
|
|
# It's okey to add duplicate entries.
|
|
run_one_command("RAY.SET_ADD", 1, 1, 3, 1)
|
|
run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1)
|
|
# It's okey to remove duplicate entries.
|
|
run_one_command("RAY.SET_REMOVE", 1, 1, 3, 1)
|
|
|
|
|
|
# Note that this test will take at least 10 seconds because it must wait for
|
|
# the monitor to detect enough missed heartbeats.
|
|
def test_warning_for_dead_node(ray_start_cluster_2_nodes, error_pubsub):
|
|
cluster = ray_start_cluster_2_nodes
|
|
cluster.wait_for_nodes()
|
|
p = error_pubsub
|
|
|
|
node_ids = {item["NodeID"] for item in ray.nodes()}
|
|
|
|
# Try to make sure that the monitor has received at least one heartbeat
|
|
# from the node.
|
|
time.sleep(0.5)
|
|
|
|
# Kill both raylets.
|
|
cluster.list_all_nodes()[1].kill_raylet()
|
|
cluster.list_all_nodes()[0].kill_raylet()
|
|
|
|
# Check that we get warning messages for both raylets.
|
|
errors = get_error_message(p, 2, ray_constants.REMOVED_NODE_ERROR, 40)
|
|
|
|
# Extract the client IDs from the error messages. This will need to be
|
|
# changed if the error message changes.
|
|
warning_node_ids = {error.error_message.split(" ")[5] for error in errors}
|
|
|
|
assert node_ids == warning_node_ids
|
|
|
|
|
|
def test_raylet_crash_when_get(ray_start_regular):
|
|
def sleep_to_kill_raylet():
|
|
# Don't kill raylet before default workers get connected.
|
|
time.sleep(2)
|
|
ray.worker._global_node.kill_raylet()
|
|
|
|
object_ref = ray.put(np.zeros(200 * 1024, dtype=np.uint8))
|
|
ray.internal.free(object_ref)
|
|
|
|
thread = threading.Thread(target=sleep_to_kill_raylet)
|
|
thread.start()
|
|
with pytest.raises(ray.exceptions.ObjectLostError):
|
|
ray.get(object_ref)
|
|
thread.join()
|
|
|
|
|
|
def test_connect_with_disconnected_node(shutdown_only):
|
|
config = {
|
|
"num_heartbeats_timeout": 50,
|
|
"raylet_heartbeat_timeout_milliseconds": 10,
|
|
}
|
|
cluster = Cluster()
|
|
cluster.add_node(num_cpus=0, _system_config=config)
|
|
ray.init(address=cluster.address)
|
|
p = init_error_pubsub()
|
|
errors = get_error_message(p, 1, timeout=5)
|
|
assert len(errors) == 0
|
|
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
|
|
dead_node = cluster.add_node(num_cpus=0)
|
|
cluster.remove_node(dead_node, allow_graceful=False)
|
|
errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR)
|
|
assert len(errors) == 1
|
|
# This node is killed by SIGKILL, ray_monitor will mark it to dead.
|
|
dead_node = cluster.add_node(num_cpus=0)
|
|
cluster.remove_node(dead_node, allow_graceful=False)
|
|
errors = get_error_message(p, 1, ray_constants.REMOVED_NODE_ERROR)
|
|
assert len(errors) == 1
|
|
# This node is killed by SIGTERM, ray_monitor will not mark it again.
|
|
removing_node = cluster.add_node(num_cpus=0)
|
|
cluster.remove_node(removing_node, allow_graceful=True)
|
|
errors = get_error_message(p, 1, timeout=2)
|
|
assert len(errors) == 0
|
|
# There is no connection error to a dead node.
|
|
errors = get_error_message(p, 1, timeout=2)
|
|
assert len(errors) == 0
|
|
p.close()
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ray_start_cluster_head", [{
|
|
"num_cpus": 5,
|
|
"object_store_memory": 10**8,
|
|
"_system_config": {
|
|
"object_store_full_max_retries": 0
|
|
}
|
|
}],
|
|
indirect=True)
|
|
def test_parallel_actor_fill_plasma_retry(ray_start_cluster_head):
|
|
@ray.remote
|
|
class LargeMemoryActor:
|
|
def some_expensive_task(self):
|
|
return np.zeros(10**8 // 2, dtype=np.uint8)
|
|
|
|
actors = [LargeMemoryActor.remote() for _ in range(5)]
|
|
for _ in range(10):
|
|
pending = [a.some_expensive_task.remote() for a in actors]
|
|
while pending:
|
|
[done], pending = ray.wait(pending, num_returns=1)
|
|
|
|
|
|
def test_fill_object_store_exception(shutdown_only):
|
|
ray.init(
|
|
num_cpus=2,
|
|
object_store_memory=10**8,
|
|
_system_config={"object_store_full_max_retries": 0})
|
|
|
|
@ray.remote
|
|
def expensive_task():
|
|
return np.zeros((10**8) // 10, dtype=np.uint8)
|
|
|
|
with pytest.raises(ray.exceptions.RayTaskError) as e:
|
|
ray.get([expensive_task.remote() for _ in range(20)])
|
|
with pytest.raises(ray.exceptions.ObjectStoreFullError):
|
|
raise e.as_instanceof_cause()
|
|
|
|
@ray.remote
|
|
class LargeMemoryActor:
|
|
def some_expensive_task(self):
|
|
return np.zeros(10**8 + 2, dtype=np.uint8)
|
|
|
|
def test(self):
|
|
return 1
|
|
|
|
actor = LargeMemoryActor.remote()
|
|
with pytest.raises(ray.exceptions.RayTaskError):
|
|
ray.get(actor.some_expensive_task.remote())
|
|
# Make sure actor does not die
|
|
ray.get(actor.test.remote())
|
|
|
|
with pytest.raises(ray.exceptions.ObjectStoreFullError):
|
|
ray.put(np.zeros(10**8 + 2, dtype=np.uint8))
|
|
|
|
|
|
def test_fill_object_store_lru_fallback(shutdown_only):
|
|
config = {
|
|
"free_objects_batch_size": 1,
|
|
}
|
|
ray.init(
|
|
num_cpus=2,
|
|
object_store_memory=10**8,
|
|
_lru_evict=True,
|
|
_system_config=config)
|
|
|
|
@ray.remote
|
|
def expensive_task():
|
|
return np.zeros((10**8) // 2, dtype=np.uint8)
|
|
|
|
# Check that objects out of scope are cleaned up quickly.
|
|
ray.get(expensive_task.remote())
|
|
start = time.time()
|
|
for _ in range(3):
|
|
ray.get(expensive_task.remote())
|
|
end = time.time()
|
|
assert end - start < 3
|
|
|
|
obj_refs = []
|
|
for _ in range(3):
|
|
obj_ref = expensive_task.remote()
|
|
ray.get(obj_ref)
|
|
obj_refs.append(obj_ref)
|
|
|
|
@ray.remote
|
|
class LargeMemoryActor:
|
|
def some_expensive_task(self):
|
|
return np.zeros(10**8 // 2, dtype=np.uint8)
|
|
|
|
def test(self):
|
|
return 1
|
|
|
|
actor = LargeMemoryActor.remote()
|
|
for _ in range(3):
|
|
obj_ref = actor.some_expensive_task.remote()
|
|
ray.get(obj_ref)
|
|
obj_refs.append(obj_ref)
|
|
# Make sure actor does not die
|
|
ray.get(actor.test.remote())
|
|
|
|
for _ in range(3):
|
|
obj_ref = ray.put(np.zeros(10**8 // 2, dtype=np.uint8))
|
|
ray.get(obj_ref)
|
|
obj_refs.append(obj_ref)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ray_start_cluster", [{
|
|
"num_nodes": 1,
|
|
"num_cpus": 2,
|
|
}, {
|
|
"num_nodes": 2,
|
|
"num_cpus": 1,
|
|
}],
|
|
indirect=True)
|
|
def test_eviction(ray_start_cluster):
|
|
@ray.remote
|
|
def large_object():
|
|
return np.zeros(10 * 1024 * 1024)
|
|
|
|
obj = large_object.remote()
|
|
assert (isinstance(ray.get(obj), np.ndarray))
|
|
# Evict the object.
|
|
ray.internal.free([obj])
|
|
# ray.get throws an exception.
|
|
with pytest.raises(ray.exceptions.ObjectLostError):
|
|
ray.get(obj)
|
|
|
|
@ray.remote
|
|
def dependent_task(x):
|
|
return
|
|
|
|
# If the object is passed by reference, the task throws an
|
|
# exception.
|
|
with pytest.raises(ray.exceptions.RayTaskError):
|
|
ray.get(dependent_task.remote(obj))
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"ray_start_cluster", [{
|
|
"num_nodes": 2,
|
|
"num_cpus": 1,
|
|
}, {
|
|
"num_nodes": 1,
|
|
"num_cpus": 2,
|
|
}],
|
|
indirect=True)
|
|
def test_serialized_id(ray_start_cluster):
|
|
@ray.remote
|
|
def small_object():
|
|
# Sleep a bit before creating the object to force a timeout
|
|
# at the getter.
|
|
time.sleep(1)
|
|
return 1
|
|
|
|
@ray.remote
|
|
def dependent_task(x):
|
|
return x
|
|
|
|
@ray.remote
|
|
def get(obj_refs, test_dependent_task):
|
|
print("get", obj_refs)
|
|
obj_ref = obj_refs[0]
|
|
if test_dependent_task:
|
|
assert ray.get(dependent_task.remote(obj_ref)) == 1
|
|
else:
|
|
assert ray.get(obj_ref) == 1
|
|
|
|
obj = small_object.remote()
|
|
ray.get(get.remote([obj], False))
|
|
|
|
obj = small_object.remote()
|
|
ray.get(get.remote([obj], True))
|
|
|
|
obj = ray.put(1)
|
|
ray.get(get.remote([obj], False))
|
|
|
|
obj = ray.put(1)
|
|
ray.get(get.remote([obj], True))
|
|
|
|
|
|
@pytest.mark.parametrize("use_actors,node_failure",
|
|
[(False, False), (False, True), (True, False),
|
|
(True, True)])
|
|
def test_fate_sharing(ray_start_cluster, use_actors, node_failure):
|
|
config = {
|
|
"num_heartbeats_timeout": 10,
|
|
"raylet_heartbeat_timeout_milliseconds": 100,
|
|
}
|
|
cluster = Cluster()
|
|
# Head node with no resources.
|
|
cluster.add_node(num_cpus=0, _system_config=config)
|
|
ray.init(address=cluster.address)
|
|
# Node to place the parent actor.
|
|
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
|
|
# Node to place the child actor.
|
|
cluster.add_node(num_cpus=1, resources={"child": 1})
|
|
cluster.wait_for_nodes()
|
|
|
|
@ray.remote
|
|
def sleep():
|
|
time.sleep(1000)
|
|
|
|
@ray.remote(resources={"child": 1})
|
|
def probe():
|
|
return
|
|
|
|
# TODO(swang): This test does not pass if max_restarts > 0 for the
|
|
# raylet codepath. Add this parameter once the GCS actor service is enabled
|
|
# by default.
|
|
@ray.remote
|
|
class Actor(object):
|
|
def __init__(self):
|
|
return
|
|
|
|
def start_child(self, use_actors):
|
|
if use_actors:
|
|
child = Actor.options(resources={"child": 1}).remote()
|
|
ray.get(child.sleep.remote())
|
|
else:
|
|
ray.get(sleep.options(resources={"child": 1}).remote())
|
|
|
|
def sleep(self):
|
|
time.sleep(1000)
|
|
|
|
def get_pid(self):
|
|
return os.getpid()
|
|
|
|
# Returns whether the "child" resource is available.
|
|
def child_resource_available():
|
|
p = probe.remote()
|
|
ready, _ = ray.wait([p], timeout=1)
|
|
return len(ready) > 0
|
|
|
|
# Test fate sharing if the parent process dies.
|
|
def test_process_failure(use_actors):
|
|
a = Actor.options(resources={"parent": 1}).remote()
|
|
pid = ray.get(a.get_pid.remote())
|
|
a.start_child.remote(use_actors=use_actors)
|
|
# Wait for the child to be scheduled.
|
|
wait_for_condition(lambda: not child_resource_available())
|
|
# Kill the parent process.
|
|
os.kill(pid, 9)
|
|
wait_for_condition(child_resource_available)
|
|
|
|
# Test fate sharing if the parent node dies.
|
|
def test_node_failure(node_to_kill, use_actors):
|
|
a = Actor.options(resources={"parent": 1}).remote()
|
|
a.start_child.remote(use_actors=use_actors)
|
|
# Wait for the child to be scheduled.
|
|
wait_for_condition(lambda: not child_resource_available())
|
|
# Kill the parent process.
|
|
cluster.remove_node(node_to_kill, allow_graceful=False)
|
|
node_to_kill = cluster.add_node(num_cpus=1, resources={"parent": 1})
|
|
wait_for_condition(child_resource_available)
|
|
return node_to_kill
|
|
|
|
if node_failure:
|
|
test_node_failure(node_to_kill, use_actors)
|
|
else:
|
|
test_process_failure(use_actors)
|
|
|
|
ray.state.state._check_connected()
|
|
keys = [
|
|
key for r in ray.state.state.redis_clients
|
|
for key in r.keys("WORKER_FAILURE*")
|
|
]
|
|
if node_failure:
|
|
assert len(keys) <= 1, len(keys)
|
|
else:
|
|
assert len(keys) <= 2, len(keys)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import pytest
|
|
sys.exit(pytest.main(["-v", __file__]))
|