Deprecate non-direct-call API (#7336)

This commit is contained in:
Edward Oakes
2020-02-27 10:37:23 -08:00
committed by GitHub
parent 55ccfb6089
commit d9027acaf2
16 changed files with 77 additions and 426 deletions
+2 -4
View File
@@ -782,7 +782,6 @@ cdef class CoreWorker:
FunctionDescriptor function_descriptor,
args,
int num_return_vals,
c_bool is_direct_call,
resources,
int max_retries):
cdef:
@@ -795,7 +794,7 @@ cdef class CoreWorker:
with self.profile_event(b"submit_task"):
prepare_resources(resources, &c_resources)
task_options = CTaskOptions(
num_return_vals, is_direct_call, c_resources)
num_return_vals, True, c_resources)
ray_function = CRayFunction(
language.lang, function_descriptor.descriptor)
prepare_args(self, args, &args_vector)
@@ -814,7 +813,6 @@ cdef class CoreWorker:
uint64_t max_reconstructions,
resources,
placement_resources,
c_bool is_direct_call,
int32_t max_concurrency,
c_bool is_detached,
c_bool is_asyncio,
@@ -838,7 +836,7 @@ cdef class CoreWorker:
check_status(self.core_worker.get().CreateActor(
ray_function, args_vector,
CActorCreationOptions(
max_reconstructions, is_direct_call, max_concurrency,
max_reconstructions, True, max_concurrency,
c_resources, c_placement_resources,
dynamic_worker_options, is_detached, is_asyncio),
extension_data,
+2 -10
View File
@@ -444,8 +444,8 @@ class ActorClass:
args = []
if kwargs is None:
kwargs = {}
if is_direct_call is None:
is_direct_call = ray_constants.direct_call_enabled()
if is_direct_call is not None and not is_direct_call:
raise ValueError("Non-direct call actors are no longer supported.")
meta = self.__ray_metadata__
actor_has_async_methods = len(
@@ -460,16 +460,9 @@ class ActorClass:
else:
max_concurrency = 1
if max_concurrency > 1 and not is_direct_call:
raise ValueError(
"setting max_concurrency requires is_direct_call=True")
if max_concurrency < 1:
raise ValueError("max_concurrency must be >= 1")
if is_asyncio and not is_direct_call:
raise ValueError(
"Setting is_asyncio requires is_direct_call=True.")
worker = ray.worker.get_global_worker()
if worker.mode is None:
raise Exception("Actors cannot be created before ray.init() "
@@ -566,7 +559,6 @@ class ActorClass:
meta.max_reconstructions,
resources,
actor_placement_resources,
is_direct_call,
max_concurrency,
detached,
is_asyncio,
+11 -13
View File
@@ -1,16 +1,15 @@
import asyncio
import time
import os
import pytest
import numpy as np
import ray
from ray.experimental import async_api
@pytest.fixture
def init():
os.environ["RAY_FORCE_DIRECT"] = "0"
ray.init(num_cpus=4)
async_api.init()
asyncio.get_event_loop().set_debug(False)
@@ -23,21 +22,20 @@ def gen_tasks(time_scale=0.1):
@ray.remote
def f(n):
time.sleep(n * time_scale)
return n
return n, np.zeros(1024 * 1024, dtype=np.uint8)
tasks = [f.remote(i) for i in range(5)]
return tasks
return [f.remote(i) for i in range(5)]
def test_simple(init):
@ray.remote
def f():
time.sleep(1)
return {"key1": ["value"]}
return np.zeros(1024 * 1024, dtype=np.uint8)
future = async_api.as_future(f.remote())
result = asyncio.get_event_loop().run_until_complete(future)
assert result["key1"] == ["value"]
assert isinstance(result, np.ndarray)
def test_gather(init):
@@ -45,7 +43,7 @@ def test_gather(init):
tasks = gen_tasks()
futures = [async_api.as_future(obj_id) for obj_id in tasks]
results = loop.run_until_complete(asyncio.gather(*futures))
assert all(a == b for a, b in zip(results, ray.get(tasks)))
assert all(a[0] == b[0] for a, b in zip(results, ray.get(tasks)))
def test_wait(init):
@@ -71,11 +69,11 @@ def test_gather_mixup(init):
@ray.remote
def f(n):
time.sleep(n * 0.1)
return n
return n, np.zeros(1024 * 1024, dtype=np.uint8)
async def g(n):
await asyncio.sleep(n * 0.1)
return n
return n, np.zeros(1024 * 1024, dtype=np.uint8)
tasks = [
async_api.as_future(f.remote(1)),
@@ -84,7 +82,7 @@ def test_gather_mixup(init):
g(4)
]
results = loop.run_until_complete(asyncio.gather(*tasks))
assert results == [1, 2, 3, 4]
assert [result[0] for result in results] == [1, 2, 3, 4]
def test_wait_mixup(init):
@@ -93,7 +91,7 @@ def test_wait_mixup(init):
@ray.remote
def f(n):
time.sleep(n)
return n
return n, np.zeros(1024 * 1024, dtype=np.uint8)
def g(n):
async def _g(_n):
-4
View File
@@ -13,10 +13,6 @@ def env_integer(key, default):
return default
def direct_call_enabled():
return bool(int(os.environ.get("RAY_FORCE_DIRECT", "1")))
ID_SIZE = 20
# The default maximum number of bytes to allocate to the object store unless
+3 -5
View File
@@ -2,7 +2,6 @@ import logging
from functools import wraps
from ray import cloudpickle as pickle
from ray import ray_constants
from ray._raylet import PythonFunctionDescriptor
from ray import cross_language, Language
import ray.signature
@@ -96,7 +95,6 @@ class RemoteFunction:
return self._remote(args=args, kwargs=kwargs)
self.remote = _remote_proxy
self.direct_call_enabled = ray_constants.direct_call_enabled()
def __call__(self, *args, **kwargs):
raise Exception("Remote functions cannot be called directly. Instead "
@@ -182,8 +180,8 @@ class RemoteFunction:
if num_return_vals is None:
num_return_vals = self._num_return_vals
if is_direct_call is None:
is_direct_call = self.direct_call_enabled
if is_direct_call is not None and not is_direct_call:
raise ValueError("Non-direct call tasks are no longer supported.")
if max_retries is None:
max_retries = self._max_retries
@@ -211,7 +209,7 @@ class RemoteFunction:
else:
object_ids = worker.core_worker.submit_task(
self._language, self._function_descriptor, list_args,
num_return_vals, is_direct_call, resources, max_retries)
num_return_vals, resources, max_retries)
if len(object_ids) == 1:
return object_ids[0]
-91
View File
@@ -12,9 +12,6 @@ import time
import ray
import ray.test_utils
import ray.cluster_utils
from ray import ray_constants
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
def test_actor_deletion_with_gpus(shutdown_only):
@@ -83,94 +80,6 @@ def test_actor_class_methods(ray_start_regular):
assert ray.get(a.g.remote(2)) == 4
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor method resources")
def test_resource_assignment(shutdown_only):
"""Test to make sure that we assign resource to actors at instantiation."""
# This test will create 16 actors. Declaring this many CPUs initially will
# speed up the test because the workers will be started ahead of time.
ray.init(
num_cpus=16,
num_gpus=1,
resources={"Custom": 1},
object_store_memory=int(150 * 1024 * 1024))
class Actor:
def __init__(self):
self.resources = ray.get_resource_ids()
def get_actor_resources(self):
return self.resources
def get_actor_method_resources(self):
return ray.get_resource_ids()
decorator_resource_args = [{}, {
"num_cpus": 0.1
}, {
"num_gpus": 0.1
}, {
"resources": {
"Custom": 0.1
}
}]
instantiation_resource_args = [{}, {
"num_cpus": 0.2
}, {
"num_gpus": 0.2
}, {
"resources": {
"Custom": 0.2
}
}]
for decorator_args in decorator_resource_args:
for instantiation_args in instantiation_resource_args:
if len(decorator_args) == 0:
actor_class = ray.remote(Actor)
else:
actor_class = ray.remote(**decorator_args)(Actor)
actor = actor_class._remote(**instantiation_args)
actor_resources = ray.get(actor.get_actor_resources.remote())
actor_method_resources = ray.get(
actor.get_actor_method_resources.remote())
if len(decorator_args) == 0 and len(instantiation_args) == 0:
assert len(actor_resources) == 0, (
"Actor should not be assigned resources.")
assert list(actor_method_resources.keys()) == [
"CPU"
], ("Actor method should only have CPUs")
assert actor_method_resources["CPU"][0][1] == 1, (
"Actor method should default to one cpu.")
else:
if ("num_cpus" not in decorator_args
and "num_cpus" not in instantiation_args):
assert actor_resources["CPU"][0][1] == 1, (
"Actor should default to one cpu.")
correct_resources = {}
defined_resources = decorator_args.copy()
defined_resources.update(instantiation_args)
for resource, value in defined_resources.items():
if resource == "num_cpus":
correct_resources["CPU"] = value
elif resource == "num_gpus":
correct_resources["GPU"] = value
elif resource == "resources":
for custom_resource, amount in value.items():
correct_resources[custom_resource] = amount
for resource, amount in correct_resources.items():
assert (actor_resources[resource][0][0] ==
actor_method_resources[resource][0][0]), (
"Should have assigned same {} for both actor ",
"and actor method.".format(resource))
assert (actor_resources[resource][0][
1] == actor_method_resources[resource][0][1]), (
"Should have assigned same amount of {} for both ",
"actor and actor method.".format(resource))
assert actor_resources[resource][0][1] == amount, (
"Actor should have {amount} {resource} but has ",
"{amount} {resource}".format(
amount=amount, resource=resource))
@pytest.mark.skipif(
os.environ.get("RAY_USE_NEW_GCS") == "on",
reason="Failing with new GCS API on Linux.")
+1 -89
View File
@@ -12,7 +12,6 @@ import numpy as np
import pytest
import ray
import ray.ray_constants as ray_constants
import ray.cluster_utils
import ray.test_utils
@@ -247,7 +246,7 @@ def test_wait_cluster(ray_start_cluster):
assert len(unready) == 0
@pytest.mark.skipif(ray_constants.direct_call_enabled(), reason="TODO(ekl)")
@pytest.mark.skip(reason="TODO(ekl)")
def test_object_transfer_dump(ray_start_cluster):
cluster = ray_start_cluster
@@ -507,93 +506,6 @@ def test_multithreading(ray_start_2_cpus):
ray.get(actor.join.remote()) == "ok"
@pytest.mark.skipif(
ray_constants.direct_call_enabled(), reason="uses task and object table")
def test_free_objects_multi_node(ray_start_cluster):
# This test will do following:
# 1. Create 3 raylets that each hold an actor.
# 2. Each actor creates an object which is the deletion target.
# 3. Wait 0.1 second for the objects to be deleted.
# 4. Check that the deletion targets have been deleted.
# Caution: if remote functions are used instead of actor methods,
# one raylet may create more than one worker to execute the
# tasks, so the flushing operations may be executed in different
# workers and the plasma client holding the deletion target
# may not be flushed.
cluster = ray_start_cluster
config = json.dumps({"object_manager_repeated_push_delay_ms": 1000})
for i in range(3):
cluster.add_node(
num_cpus=1,
resources={"Custom{}".format(i): 1},
_internal_config=config)
ray.init(address=cluster.address)
class RawActor:
def get(self):
return ray.worker.global_worker.node.unique_id
ActorOnNode0 = ray.remote(resources={"Custom0": 1})(RawActor)
ActorOnNode1 = ray.remote(resources={"Custom1": 1})(RawActor)
ActorOnNode2 = ray.remote(resources={"Custom2": 1})(RawActor)
def create(actors):
a = actors[0].get.remote()
b = actors[1].get.remote()
c = actors[2].get.remote()
(l1, l2) = ray.wait([a, b, c], num_returns=3)
assert len(l1) == 3
assert len(l2) == 0
return (a, b, c)
def run_one_test(actors, local_only, delete_creating_tasks):
(a, b, c) = create(actors)
# The three objects should be generated on different object stores.
assert ray.get(a) != ray.get(b)
assert ray.get(a) != ray.get(c)
assert ray.get(c) != ray.get(b)
ray.internal.free(
[a, b, c],
local_only=local_only,
delete_creating_tasks=delete_creating_tasks)
# Wait for the objects to be deleted.
time.sleep(0.1)
return (a, b, c)
actors = [
ActorOnNode0.remote(),
ActorOnNode1.remote(),
ActorOnNode2.remote()
]
# Case 1: run this local_only=False. All 3 objects will be deleted.
(a, b, c) = run_one_test(actors, False, False)
(l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=1)
# All the objects are deleted.
assert len(l1) == 0
assert len(l2) == 3
# Case 2: run this local_only=True. Only 1 object will be deleted.
(a, b, c) = run_one_test(actors, True, False)
(l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=3)
# One object is deleted and 2 objects are not.
assert len(l1) == 2
assert len(l2) == 1
# The deleted object will have the same store with the driver.
local_return = ray.worker.global_worker.node.unique_id
for object_id in l1:
assert ray.get(object_id) != local_return
# Case3: These cases test the deleting creating tasks for the object.
(a, b, c) = run_one_test(actors, False, False)
task_table = ray.tasks()
for obj in [a, b, c]:
assert ray._raylet.compute_task_id(obj).hex() in task_table
(a, b, c) = run_one_test(actors, False, True)
task_table = ray.tasks()
for obj in [a, b, c]:
assert ray._raylet.compute_task_id(obj).hex() not in task_table
def test_local_mode(shutdown_only):
@ray.remote
def local_mode_f():
-64
View File
@@ -15,7 +15,6 @@ import pickle
import pytest
import ray
from ray import signature
import ray.ray_constants as ray_constants
import ray.cluster_utils
import ray.test_utils
@@ -191,69 +190,6 @@ def test_global_state_api(shutdown_only):
assert job_table[0]["NodeManagerAddress"] == node_ip_address
@pytest.mark.skipif(
ray_constants.direct_call_enabled(),
reason="object and task API not supported")
def test_global_state_task_object_api(shutdown_only):
ray.init()
job_id = ray.utils.compute_job_id_from_driver(
ray.WorkerID(ray.worker.global_worker.worker_id))
driver_task_id = ray.worker.global_worker.current_task_id.hex()
nil_actor_id_hex = ray.ActorID.nil().hex()
@ray.remote
def f(*xs):
return 1
x_id = ray.put(1)
result_id = f.remote(1, "hi", x_id)
# Wait for one additional task to complete.
wait_for_num_tasks(1 + 1)
task_table = ray.tasks()
assert len(task_table) == 1 + 1
task_id_set = set(task_table.keys())
task_id_set.remove(driver_task_id)
task_id = list(task_id_set)[0]
task_spec = task_table[task_id]["TaskSpec"]
assert task_spec["ActorID"] == nil_actor_id_hex
assert task_spec["Args"] == [
signature.DUMMY_TYPE, 1, signature.DUMMY_TYPE, "hi",
signature.DUMMY_TYPE, x_id
]
assert task_spec["JobID"] == job_id.hex()
assert task_spec["ReturnObjectIDs"] == [result_id]
assert task_table[task_id] == ray.tasks(task_id)
# Wait for two objects, one for the x_id and one for result_id.
wait_for_num_objects(2)
def wait_for_object_table():
timeout = 10
start_time = time.time()
while time.time() - start_time < timeout:
object_table = ray.objects()
tables_ready = (object_table[x_id]["ManagerIDs"] is not None and
object_table[result_id]["ManagerIDs"] is not None)
if tables_ready:
return
time.sleep(0.1)
raise RayTestTimeoutException(
"Timed out while waiting for object table to "
"update.")
object_table = ray.objects()
assert len(object_table) == 2
assert object_table[x_id] == ray.objects(x_id)
object_table_entry = ray.objects(result_id)
assert object_table[result_id] == object_table_entry
# TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we
# should use those, but they seem to conflict with Ray's use of faulthandler.
class CaptureOutputAndError:
+12 -13
View File
@@ -22,7 +22,7 @@ def test_asyncio_actor(ray_start_regular_shared):
await self.event.wait()
return sorted(self.batch)
a = AsyncBatcher.options(is_direct_call=True).remote()
a = AsyncBatcher.remote()
x1 = a.add.remote(1)
x2 = a.add.remote(2)
x3 = a.add.remote(3)
@@ -42,7 +42,7 @@ def test_asyncio_actor_same_thread(ray_start_regular_shared):
async def async_thread_id(self):
return threading.current_thread().ident
a = Actor.options(is_direct_call=True).remote()
a = Actor.remote()
sync_id, async_id = ray.get(
[a.sync_thread_id.remote(),
a.async_thread_id.remote()])
@@ -66,7 +66,7 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared):
num_calls = 10
a = RecordOrder.options(is_direct_call=True, max_concurrency=1).remote()
a = RecordOrder.options(max_concurrency=1).remote()
ray.get([a.do_work.remote() for _ in range(num_calls)])
history = ray.get(a.get_history.remote())
@@ -99,8 +99,8 @@ def test_asyncio_actor_high_concurrency(ray_start_regular_shared):
return sorted(self.batch)
batch_size = sys.getrecursionlimit() * 4
actor = AsyncConcurrencyBatcher.options(
max_concurrency=batch_size * 2, is_direct_call=True).remote(batch_size)
actor = AsyncConcurrencyBatcher.options(max_concurrency=batch_size *
2).remote(batch_size)
result = ray.get([actor.add.remote(i) for i in range(batch_size)])
assert result[0] == list(range(batch_size))
assert result[-1] == list(range(batch_size))
@@ -130,11 +130,11 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
with pytest.raises(ray.exceptions.RayTaskError):
await ray.async_compat.get_async(task_throws.remote())
# Test Direct Actor Call
# Test actor calls.
str_len = 200 * 1024
@ray.remote
class DirectActor:
class Actor:
def echo(self, i):
return i
@@ -145,18 +145,17 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop):
def throw_error(self):
1 / 0
direct = DirectActor.options(is_direct_call=True).remote()
actor = Actor.remote()
direct_actor_call_future = ray.async_compat.get_async(
direct.echo.remote(2))
assert await direct_actor_call_future == 2
actor_call_future = ray.async_compat.get_async(actor.echo.remote(2))
assert await actor_call_future == 2
promoted_to_plasma_future = ray.async_compat.get_async(
direct.big_object.remote())
actor.big_object.remote())
assert await promoted_to_plasma_future == "a" * str_len
with pytest.raises(ray.exceptions.RayTaskError):
await ray.async_compat.get_async(direct.throw_error.remote())
await ray.async_compat.get_async(actor.throw_error.remote())
def test_asyncio_actor_async_get(ray_start_regular_shared):
+29 -75
View File
@@ -1348,27 +1348,6 @@ def test_get_with_timeout(ray_start_regular):
assert time.time() - start < 30
@pytest.mark.parametrize(
"ray_start_cluster", [{
"num_cpus": 1,
"num_nodes": 1,
}, {
"num_cpus": 1,
"num_nodes": 2,
}],
indirect=True)
def test_direct_call_simple(ray_start_cluster):
@ray.remote
def f(x):
return x + 1
f_direct = f.options(is_direct_call=True)
assert ray.get(f_direct.remote(2)) == 3
for _ in range(10):
assert ray.get([f_direct.remote(i) for i in range(100)]) == list(
range(1, 101))
# https://github.com/ray-project/ray/issues/6329
def test_call_actors_indirect_through_tasks(ray_start_regular):
@ray.remote
@@ -1399,28 +1378,7 @@ def test_call_actors_indirect_through_tasks(ray_start_regular):
ray.get(zoo.remote([c]))
def test_direct_call_refcount(ray_start_regular):
@ray.remote
def f(x):
return x + 1
@ray.remote
def sleep():
time.sleep(.1)
return 1
# Multiple gets should not hang with ref counting enabled.
f_direct = f.options(is_direct_call=True)
x = f_direct.remote(2)
ray.get(x)
ray.get(x)
# Temporary objects should be retained for chained callers.
y = f_direct.remote(sleep.options(is_direct_call=True).remote())
assert ray.get(y) == 2
def test_direct_call_matrix(shutdown_only):
def test_call_matrix(shutdown_only):
ray.init(object_store_memory=1000 * 1024 * 1024)
@ray.remote
@@ -1456,23 +1414,23 @@ def test_direct_call_matrix(shutdown_only):
if is_large else "small_object", "out_of_band"
if out_of_band else "in_band")
if source_actor:
a = Actor.options(is_direct_call=True).remote()
a = Actor.remote()
if is_large:
x_id = a.large_value.remote()
else:
x_id = a.small_value.remote()
else:
if is_large:
x_id = large_value.options(is_direct_call=True).remote()
x_id = large_value.remote()
else:
x_id = small_value.options(is_direct_call=True).remote()
x_id = small_value.remote()
if out_of_band:
x_id = [x_id]
if dest_actor:
b = Actor.options(is_direct_call=True).remote()
b = Actor.remote()
x = ray.get(b.echo.remote(x_id))
else:
x = ray.get(echo.options(is_direct_call=True).remote(x_id))
x = ray.get(echo.remote(x_id))
if is_large:
assert isinstance(x, np.ndarray)
else:
@@ -1494,19 +1452,18 @@ def test_direct_call_matrix(shutdown_only):
"num_nodes": 2,
}],
indirect=True)
def test_direct_call_chain(ray_start_cluster):
def test_call_chain(ray_start_cluster):
@ray.remote
def g(x):
return x + 1
g_direct = g.options(is_direct_call=True)
x = 0
for _ in range(100):
x = g_direct.remote(x)
x = g.remote(x)
assert ray.get(x) == 100
def test_direct_inline_arg_memory_corruption(ray_start_regular):
def test_inline_arg_memory_corruption(ray_start_regular):
@ray.remote
def f():
return np.zeros(1000, dtype=np.uint8)
@@ -1521,13 +1478,12 @@ def test_direct_inline_arg_memory_corruption(ray_start_regular):
for prev in self.z:
assert np.sum(prev) == 0, ("memory corruption detected", prev)
a = Actor.options(is_direct_call=True).remote()
f_direct = f.options(is_direct_call=True)
a = Actor.remote()
for i in range(100):
ray.get(a.add.remote(f_direct.remote()))
ray.get(a.add.remote(f.remote()))
def test_direct_actor_enabled(ray_start_regular):
def test_skip_plasma(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
@@ -1536,14 +1492,14 @@ def test_direct_actor_enabled(ray_start_regular):
def f(self, x):
return x * 2
a = Actor._remote(is_direct_call=True)
a = Actor.remote()
obj_id = a.f.remote(1)
# it is not stored in plasma
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
assert ray.get(obj_id) == 2
def test_direct_actor_order(shutdown_only):
def test_actor_call_order(shutdown_only):
ray.init(num_cpus=4)
@ray.remote
@@ -1561,14 +1517,12 @@ def test_direct_actor_order(shutdown_only):
self.count += 1
return count
a = Actor._remote(is_direct_call=True)
assert ray.get([
a.inc.remote(i, small_value.options(is_direct_call=True).remote())
for i in range(100)
]) == list(range(100))
a = Actor.remote()
assert ray.get([a.inc.remote(i, small_value.remote())
for i in range(100)]) == list(range(100))
def test_direct_actor_large_objects(ray_start_regular):
def test_actor_large_objects(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
@@ -1578,7 +1532,7 @@ def test_direct_actor_large_objects(ray_start_regular):
time.sleep(1)
return np.zeros(10000000)
a = Actor._remote(is_direct_call=True)
a = Actor.remote()
obj_id = a.f.remote()
assert not ray.worker.global_worker.core_worker.object_exists(obj_id)
done, _ = ray.wait([obj_id])
@@ -1587,7 +1541,7 @@ def test_direct_actor_large_objects(ray_start_regular):
assert isinstance(ray.get(obj_id), np.ndarray)
def test_direct_actor_pass_by_ref(ray_start_regular):
def test_actor_pass_by_ref(ray_start_regular):
@ray.remote
class Actor:
def __init__(self):
@@ -1604,7 +1558,7 @@ def test_direct_actor_pass_by_ref(ray_start_regular):
def error():
sys.exit(0)
a = Actor._remote(is_direct_call=True)
a = Actor.remote()
assert ray.get(a.f.remote(f.remote(1))) == 2
fut = [a.f.remote(f.remote(i)) for i in range(100)]
@@ -1615,7 +1569,7 @@ def test_direct_actor_pass_by_ref(ray_start_regular):
ray.get(a.f.remote(error.remote()))
def test_direct_actor_pass_by_ref_order_optimization(shutdown_only):
def test_actor_pass_by_ref_order_optimization(shutdown_only):
ray.init(num_cpus=4)
@ray.remote
@@ -1626,7 +1580,7 @@ def test_direct_actor_pass_by_ref_order_optimization(shutdown_only):
def f(self, x):
pass
a = Actor._remote(is_direct_call=True)
a = Actor.remote()
@ray.remote
def fast_value():
@@ -1652,7 +1606,7 @@ def test_direct_actor_pass_by_ref_order_optimization(shutdown_only):
assert delta < 10, "did not skip slow value"
def test_direct_actor_recursive(ray_start_regular):
def test_actor_recursive(ray_start_regular):
@ray.remote
class Actor:
def __init__(self, delegate=None):
@@ -1663,9 +1617,9 @@ def test_direct_actor_recursive(ray_start_regular):
return ray.get(self.delegate.f.remote(x))
return x * 2
a = Actor._remote(is_direct_call=True)
b = Actor._remote(args=[a], is_direct_call=True)
c = Actor._remote(args=[b], is_direct_call=True)
a = Actor.remote()
b = Actor.remote(a)
c = Actor.remote(b)
result = ray.get([c.f.remote(i) for i in range(100)])
assert result == [x * 2 for x in range(100)]
@@ -1675,7 +1629,7 @@ def test_direct_actor_recursive(ray_start_regular):
assert result == [x * 2 for x in range(100)]
def test_direct_actor_concurrent(ray_start_regular):
def test_actor_concurrent(ray_start_regular):
@ray.remote
class Batcher:
def __init__(self):
@@ -1690,7 +1644,7 @@ def test_direct_actor_concurrent(ray_start_regular):
self.event.wait()
return sorted(self.batch)
a = Batcher.options(is_direct_call=True, max_concurrency=3).remote()
a = Batcher.options(max_concurrency=3).remote()
x1 = a.add.remote(1)
x2 = a.add.remote(2)
x3 = a.add.remote(3)
+5 -7
View File
@@ -21,8 +21,6 @@ from ray.test_utils import (
RayTestTimeoutException,
)
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
def test_failed_task(ray_start_regular):
@ray.remote
@@ -121,13 +119,13 @@ def test_get_throws_quickly_when_found_exception(ray_start_regular):
assert err.type is exception
f = random_path()
actor = Actor.options(is_direct_call=True, max_concurrency=2).remote()
actor = Actor.options(max_concurrency=2).remote()
expect_exception([actor.bad_func1.remote(),
actor.slow_func.remote(f)], ray.exceptions.RayTaskError)
touch(f)
f = random_path()
actor = Actor.options(is_direct_call=True, max_concurrency=2).remote()
actor = Actor.options(max_concurrency=2).remote()
expect_exception([actor.bad_func2.remote(),
actor.slow_func.remote(f)], ray.exceptions.RayActorError)
touch(f)
@@ -590,7 +588,7 @@ def test_export_large_objects(ray_start_regular):
wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2)
@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO detect resource deadlock")
@pytest.mark.skip(reason="TODO detect resource deadlock")
def test_warning_for_resource_deadlock(shutdown_only):
# Check that we get warning messages for infeasible tasks.
ray.init(num_cpus=1)
@@ -955,7 +953,7 @@ def test_fill_object_store_exception(shutdown_only):
"num_cpus": 1,
}],
indirect=True)
def test_direct_call_eviction(ray_start_cluster):
def test_eviction(ray_start_cluster):
@ray.remote
def large_object():
return np.zeros(10 * 1024 * 1024)
@@ -989,7 +987,7 @@ def test_direct_call_eviction(ray_start_cluster):
"num_cpus": 1,
}],
indirect=True)
def test_direct_call_serialized_id_eviction(ray_start_cluster):
def test_serialized_id_eviction(ray_start_cluster):
@ray.remote
def large_object():
return np.zeros(10 * 1024 * 1024)
+1 -4
View File
@@ -4,7 +4,6 @@ import subprocess
import time
import ray
from ray import ray_constants
from ray.test_utils import (
RayTestTimeoutException,
run_string_as_driver,
@@ -480,9 +479,7 @@ print("success")
assert "success" in out
@pytest.mark.skipif(
ray_constants.direct_call_enabled(),
reason="fate sharing not implemented yet")
@pytest.mark.skip(reason="fate sharing not implemented yet")
def test_driver_exiting_when_worker_blocked(call_ray_start):
# This test will create some drivers that submit some tasks and then
# exit without waiting for the tasks to complete.
+8 -14
View File
@@ -11,8 +11,6 @@ import ray.ray_constants as ray_constants
from ray.cluster_utils import Cluster
from ray.test_utils import RayTestTimeoutException
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
@pytest.fixture(params=[(1, 4), (4, 4)])
def ray_start_workers_separate_multinode(request):
@@ -80,20 +78,16 @@ def _test_component_failed(cluster, component_type):
# Submit many tasks with many dependencies.
@ray.remote
def f(x):
if RAY_FORCE_DIRECT:
# Sleep to make sure that tasks actually fail mid-execution. We
# only use it for direct calls because the test already takes a
# long time to run with the raylet codepath.
time.sleep(0.01)
# Sleep to make sure that tasks actually fail mid-execution.
time.sleep(0.01)
return x
@ray.remote
def g(*xs):
if RAY_FORCE_DIRECT:
# Sleep to make sure that tasks actually fail mid-execution. We
# only use it for direct calls because the test already takes a
# long time to run with the raylet codepath.
time.sleep(0.01)
# Sleep to make sure that tasks actually fail mid-execution. We
# only use it for direct calls because the test already takes a
# long time to run with the raylet codepath.
time.sleep(0.01)
return 1
# Kill the component on all nodes except the head node as the tasks
@@ -151,7 +145,7 @@ def check_components_alive(cluster, component_type, check_component_alive):
"num_nodes": 4,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100
"num_heartbeats_timeout": 10
}),
}],
indirect=True)
@@ -175,7 +169,7 @@ def test_raylet_failed(ray_start_cluster):
"num_nodes": 2,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100
"num_heartbeats_timeout": 10
}),
}],
indirect=True)
@@ -9,12 +9,8 @@ import pytest
import ray
import ray.ray_constants as ray_constants
RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
@pytest.mark.skipif(
RAY_FORCE_DIRECT,
reason="No reconstruction for objects placed in plasma yet")
@pytest.mark.skip(reason="No reconstruction for objects placed in plasma yet")
@pytest.mark.parametrize(
"ray_start_cluster",
[{
@@ -24,7 +20,7 @@ RAY_FORCE_DIRECT = ray_constants.direct_call_enabled()
"object_store_memory": 1000 * 1024 * 1024,
"_internal_config": json.dumps({
# Raylet codepath is not stable with a shorter timeout.
"num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100,
"num_heartbeats_timeout": 10,
"object_manager_pull_timeout_ms": 1000,
"object_manager_push_timeout_ms": 1000,
"object_manager_repeated_push_delay_ms": 1000,
+1 -7
View File
@@ -7,14 +7,8 @@ import time
import warnings
import ray
from ray import ray_constants
from ray.cluster_utils import Cluster
# TODO(yuhguo): This test file requires a lot of CPU/memory, and
# better be put in Jenkins. However, it fails frequently in Jenkins, but
# works well in Travis. We should consider moving it back to Jenkins once
# we figure out the reason.
if (multiprocessing.cpu_count() < 40
or ray.utils.get_system_memory() < 50 * 10**9):
warnings.warn("This test must be run on large machines.")
@@ -42,7 +36,7 @@ def ray_start_cluster_with_resource():
# This test is here to make sure that when we broadcast an object to a bunch of
# machines, we don't have too many excess object transfers.
@pytest.mark.skipif(ray_constants.direct_call_enabled(), reason="TODO(ekl)")
@pytest.mark.skip(reason="TODO(ekl)")
def test_object_broadcast(ray_start_cluster_with_resource):
cluster, num_nodes = ray_start_cluster_with_resource
@@ -2,7 +2,6 @@ import numpy as np
import unittest
import ray
from ray import ray_constants
class TestUnreconstructableErrors(unittest.TestCase):
@@ -23,25 +22,6 @@ class TestUnreconstructableErrors(unittest.TestCase):
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))
def testLineageEvictedReconstructionFails(self):
if ray_constants.direct_call_enabled():
return # not relevant
@ray.remote
def f(data):
return 0
x_id = f.remote(None)
ray.get(x_id)
# Hold references to the ray.put objects so they aren't LRU'd.
oids = []
for _ in range(400):
new_oids = [f.remote(np.zeros(10000)) for _ in range(50)]
oids.extend(new_oids)
ray.get(new_oids)
self.assertRaises(ray.exceptions.UnreconstructableError,
lambda: ray.get(x_id))
if __name__ == "__main__":
import pytest