diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 2413879c3..16a4b64f3 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -782,7 +782,6 @@ cdef class CoreWorker: FunctionDescriptor function_descriptor, args, int num_return_vals, - c_bool is_direct_call, resources, int max_retries): cdef: @@ -795,7 +794,7 @@ cdef class CoreWorker: with self.profile_event(b"submit_task"): prepare_resources(resources, &c_resources) task_options = CTaskOptions( - num_return_vals, is_direct_call, c_resources) + num_return_vals, True, c_resources) ray_function = CRayFunction( language.lang, function_descriptor.descriptor) prepare_args(self, args, &args_vector) @@ -814,7 +813,6 @@ cdef class CoreWorker: uint64_t max_reconstructions, resources, placement_resources, - c_bool is_direct_call, int32_t max_concurrency, c_bool is_detached, c_bool is_asyncio, @@ -838,7 +836,7 @@ cdef class CoreWorker: check_status(self.core_worker.get().CreateActor( ray_function, args_vector, CActorCreationOptions( - max_reconstructions, is_direct_call, max_concurrency, + max_reconstructions, True, max_concurrency, c_resources, c_placement_resources, dynamic_worker_options, is_detached, is_asyncio), extension_data, diff --git a/python/ray/actor.py b/python/ray/actor.py index 40f045f71..4c456c8ad 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -444,8 +444,8 @@ class ActorClass: args = [] if kwargs is None: kwargs = {} - if is_direct_call is None: - is_direct_call = ray_constants.direct_call_enabled() + if is_direct_call is not None and not is_direct_call: + raise ValueError("Non-direct call actors are no longer supported.") meta = self.__ray_metadata__ actor_has_async_methods = len( @@ -460,16 +460,9 @@ class ActorClass: else: max_concurrency = 1 - if max_concurrency > 1 and not is_direct_call: - raise ValueError( - "setting max_concurrency requires is_direct_call=True") if max_concurrency < 1: raise ValueError("max_concurrency must be >= 1") - if is_asyncio and not is_direct_call: - raise ValueError( - "Setting is_asyncio requires is_direct_call=True.") - worker = ray.worker.get_global_worker() if worker.mode is None: raise Exception("Actors cannot be created before ray.init() " @@ -566,7 +559,6 @@ class ActorClass: meta.max_reconstructions, resources, actor_placement_resources, - is_direct_call, max_concurrency, detached, is_asyncio, diff --git a/python/ray/experimental/test/async_test.py b/python/ray/experimental/test/async_test.py index 3a8f8b922..5c94fc3e3 100644 --- a/python/ray/experimental/test/async_test.py +++ b/python/ray/experimental/test/async_test.py @@ -1,16 +1,15 @@ import asyncio import time -import os - import pytest +import numpy as np + import ray from ray.experimental import async_api @pytest.fixture def init(): - os.environ["RAY_FORCE_DIRECT"] = "0" ray.init(num_cpus=4) async_api.init() asyncio.get_event_loop().set_debug(False) @@ -23,21 +22,20 @@ def gen_tasks(time_scale=0.1): @ray.remote def f(n): time.sleep(n * time_scale) - return n + return n, np.zeros(1024 * 1024, dtype=np.uint8) - tasks = [f.remote(i) for i in range(5)] - return tasks + return [f.remote(i) for i in range(5)] def test_simple(init): @ray.remote def f(): time.sleep(1) - return {"key1": ["value"]} + return np.zeros(1024 * 1024, dtype=np.uint8) future = async_api.as_future(f.remote()) result = asyncio.get_event_loop().run_until_complete(future) - assert result["key1"] == ["value"] + assert isinstance(result, np.ndarray) def test_gather(init): @@ -45,7 +43,7 @@ def test_gather(init): tasks = gen_tasks() futures = [async_api.as_future(obj_id) for obj_id in tasks] results = loop.run_until_complete(asyncio.gather(*futures)) - assert all(a == b for a, b in zip(results, ray.get(tasks))) + assert all(a[0] == b[0] for a, b in zip(results, ray.get(tasks))) def test_wait(init): @@ -71,11 +69,11 @@ def test_gather_mixup(init): @ray.remote def f(n): time.sleep(n * 0.1) - return n + return n, np.zeros(1024 * 1024, dtype=np.uint8) async def g(n): await asyncio.sleep(n * 0.1) - return n + return n, np.zeros(1024 * 1024, dtype=np.uint8) tasks = [ async_api.as_future(f.remote(1)), @@ -84,7 +82,7 @@ def test_gather_mixup(init): g(4) ] results = loop.run_until_complete(asyncio.gather(*tasks)) - assert results == [1, 2, 3, 4] + assert [result[0] for result in results] == [1, 2, 3, 4] def test_wait_mixup(init): @@ -93,7 +91,7 @@ def test_wait_mixup(init): @ray.remote def f(n): time.sleep(n) - return n + return n, np.zeros(1024 * 1024, dtype=np.uint8) def g(n): async def _g(_n): diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index c96dbeb26..5046ff720 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -13,10 +13,6 @@ def env_integer(key, default): return default -def direct_call_enabled(): - return bool(int(os.environ.get("RAY_FORCE_DIRECT", "1"))) - - ID_SIZE = 20 # The default maximum number of bytes to allocate to the object store unless diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 36014dd89..02f3807f9 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -2,7 +2,6 @@ import logging from functools import wraps from ray import cloudpickle as pickle -from ray import ray_constants from ray._raylet import PythonFunctionDescriptor from ray import cross_language, Language import ray.signature @@ -96,7 +95,6 @@ class RemoteFunction: return self._remote(args=args, kwargs=kwargs) self.remote = _remote_proxy - self.direct_call_enabled = ray_constants.direct_call_enabled() def __call__(self, *args, **kwargs): raise Exception("Remote functions cannot be called directly. Instead " @@ -182,8 +180,8 @@ class RemoteFunction: if num_return_vals is None: num_return_vals = self._num_return_vals - if is_direct_call is None: - is_direct_call = self.direct_call_enabled + if is_direct_call is not None and not is_direct_call: + raise ValueError("Non-direct call tasks are no longer supported.") if max_retries is None: max_retries = self._max_retries @@ -211,7 +209,7 @@ class RemoteFunction: else: object_ids = worker.core_worker.submit_task( self._language, self._function_descriptor, list_args, - num_return_vals, is_direct_call, resources, max_retries) + num_return_vals, resources, max_retries) if len(object_ids) == 1: return object_ids[0] diff --git a/python/ray/tests/test_actor_resources.py b/python/ray/tests/test_actor_resources.py index 74ce411db..f6595254c 100644 --- a/python/ray/tests/test_actor_resources.py +++ b/python/ray/tests/test_actor_resources.py @@ -12,9 +12,6 @@ import time import ray import ray.test_utils import ray.cluster_utils -from ray import ray_constants - -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() def test_actor_deletion_with_gpus(shutdown_only): @@ -83,94 +80,6 @@ def test_actor_class_methods(ray_start_regular): assert ray.get(a.g.remote(2)) == 4 -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="no actor method resources") -def test_resource_assignment(shutdown_only): - """Test to make sure that we assign resource to actors at instantiation.""" - # This test will create 16 actors. Declaring this many CPUs initially will - # speed up the test because the workers will be started ahead of time. - ray.init( - num_cpus=16, - num_gpus=1, - resources={"Custom": 1}, - object_store_memory=int(150 * 1024 * 1024)) - - class Actor: - def __init__(self): - self.resources = ray.get_resource_ids() - - def get_actor_resources(self): - return self.resources - - def get_actor_method_resources(self): - return ray.get_resource_ids() - - decorator_resource_args = [{}, { - "num_cpus": 0.1 - }, { - "num_gpus": 0.1 - }, { - "resources": { - "Custom": 0.1 - } - }] - instantiation_resource_args = [{}, { - "num_cpus": 0.2 - }, { - "num_gpus": 0.2 - }, { - "resources": { - "Custom": 0.2 - } - }] - for decorator_args in decorator_resource_args: - for instantiation_args in instantiation_resource_args: - if len(decorator_args) == 0: - actor_class = ray.remote(Actor) - else: - actor_class = ray.remote(**decorator_args)(Actor) - actor = actor_class._remote(**instantiation_args) - actor_resources = ray.get(actor.get_actor_resources.remote()) - actor_method_resources = ray.get( - actor.get_actor_method_resources.remote()) - if len(decorator_args) == 0 and len(instantiation_args) == 0: - assert len(actor_resources) == 0, ( - "Actor should not be assigned resources.") - assert list(actor_method_resources.keys()) == [ - "CPU" - ], ("Actor method should only have CPUs") - assert actor_method_resources["CPU"][0][1] == 1, ( - "Actor method should default to one cpu.") - else: - if ("num_cpus" not in decorator_args - and "num_cpus" not in instantiation_args): - assert actor_resources["CPU"][0][1] == 1, ( - "Actor should default to one cpu.") - correct_resources = {} - defined_resources = decorator_args.copy() - defined_resources.update(instantiation_args) - for resource, value in defined_resources.items(): - if resource == "num_cpus": - correct_resources["CPU"] = value - elif resource == "num_gpus": - correct_resources["GPU"] = value - elif resource == "resources": - for custom_resource, amount in value.items(): - correct_resources[custom_resource] = amount - for resource, amount in correct_resources.items(): - assert (actor_resources[resource][0][0] == - actor_method_resources[resource][0][0]), ( - "Should have assigned same {} for both actor ", - "and actor method.".format(resource)) - assert (actor_resources[resource][0][ - 1] == actor_method_resources[resource][0][1]), ( - "Should have assigned same amount of {} for both ", - "actor and actor method.".format(resource)) - assert actor_resources[resource][0][1] == amount, ( - "Actor should have {amount} {resource} but has ", - "{amount} {resource}".format( - amount=amount, resource=resource)) - - @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index 10bfa5890..fa3707c7e 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -12,7 +12,6 @@ import numpy as np import pytest import ray -import ray.ray_constants as ray_constants import ray.cluster_utils import ray.test_utils @@ -247,7 +246,7 @@ def test_wait_cluster(ray_start_cluster): assert len(unready) == 0 -@pytest.mark.skipif(ray_constants.direct_call_enabled(), reason="TODO(ekl)") +@pytest.mark.skip(reason="TODO(ekl)") def test_object_transfer_dump(ray_start_cluster): cluster = ray_start_cluster @@ -507,93 +506,6 @@ def test_multithreading(ray_start_2_cpus): ray.get(actor.join.remote()) == "ok" -@pytest.mark.skipif( - ray_constants.direct_call_enabled(), reason="uses task and object table") -def test_free_objects_multi_node(ray_start_cluster): - # This test will do following: - # 1. Create 3 raylets that each hold an actor. - # 2. Each actor creates an object which is the deletion target. - # 3. Wait 0.1 second for the objects to be deleted. - # 4. Check that the deletion targets have been deleted. - # Caution: if remote functions are used instead of actor methods, - # one raylet may create more than one worker to execute the - # tasks, so the flushing operations may be executed in different - # workers and the plasma client holding the deletion target - # may not be flushed. - cluster = ray_start_cluster - config = json.dumps({"object_manager_repeated_push_delay_ms": 1000}) - for i in range(3): - cluster.add_node( - num_cpus=1, - resources={"Custom{}".format(i): 1}, - _internal_config=config) - ray.init(address=cluster.address) - - class RawActor: - def get(self): - return ray.worker.global_worker.node.unique_id - - ActorOnNode0 = ray.remote(resources={"Custom0": 1})(RawActor) - ActorOnNode1 = ray.remote(resources={"Custom1": 1})(RawActor) - ActorOnNode2 = ray.remote(resources={"Custom2": 1})(RawActor) - - def create(actors): - a = actors[0].get.remote() - b = actors[1].get.remote() - c = actors[2].get.remote() - (l1, l2) = ray.wait([a, b, c], num_returns=3) - assert len(l1) == 3 - assert len(l2) == 0 - return (a, b, c) - - def run_one_test(actors, local_only, delete_creating_tasks): - (a, b, c) = create(actors) - # The three objects should be generated on different object stores. - assert ray.get(a) != ray.get(b) - assert ray.get(a) != ray.get(c) - assert ray.get(c) != ray.get(b) - ray.internal.free( - [a, b, c], - local_only=local_only, - delete_creating_tasks=delete_creating_tasks) - # Wait for the objects to be deleted. - time.sleep(0.1) - return (a, b, c) - - actors = [ - ActorOnNode0.remote(), - ActorOnNode1.remote(), - ActorOnNode2.remote() - ] - # Case 1: run this local_only=False. All 3 objects will be deleted. - (a, b, c) = run_one_test(actors, False, False) - (l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=1) - # All the objects are deleted. - assert len(l1) == 0 - assert len(l2) == 3 - # Case 2: run this local_only=True. Only 1 object will be deleted. - (a, b, c) = run_one_test(actors, True, False) - (l1, l2) = ray.wait([a, b, c], timeout=0.01, num_returns=3) - # One object is deleted and 2 objects are not. - assert len(l1) == 2 - assert len(l2) == 1 - # The deleted object will have the same store with the driver. - local_return = ray.worker.global_worker.node.unique_id - for object_id in l1: - assert ray.get(object_id) != local_return - - # Case3: These cases test the deleting creating tasks for the object. - (a, b, c) = run_one_test(actors, False, False) - task_table = ray.tasks() - for obj in [a, b, c]: - assert ray._raylet.compute_task_id(obj).hex() in task_table - - (a, b, c) = run_one_test(actors, False, True) - task_table = ray.tasks() - for obj in [a, b, c]: - assert ray._raylet.compute_task_id(obj).hex() not in task_table - - def test_local_mode(shutdown_only): @ray.remote def local_mode_f(): diff --git a/python/ray/tests/test_advanced_3.py b/python/ray/tests/test_advanced_3.py index b5cdfcdef..1956f7d20 100644 --- a/python/ray/tests/test_advanced_3.py +++ b/python/ray/tests/test_advanced_3.py @@ -15,7 +15,6 @@ import pickle import pytest import ray -from ray import signature import ray.ray_constants as ray_constants import ray.cluster_utils import ray.test_utils @@ -191,69 +190,6 @@ def test_global_state_api(shutdown_only): assert job_table[0]["NodeManagerAddress"] == node_ip_address -@pytest.mark.skipif( - ray_constants.direct_call_enabled(), - reason="object and task API not supported") -def test_global_state_task_object_api(shutdown_only): - ray.init() - - job_id = ray.utils.compute_job_id_from_driver( - ray.WorkerID(ray.worker.global_worker.worker_id)) - driver_task_id = ray.worker.global_worker.current_task_id.hex() - - nil_actor_id_hex = ray.ActorID.nil().hex() - - @ray.remote - def f(*xs): - return 1 - - x_id = ray.put(1) - result_id = f.remote(1, "hi", x_id) - - # Wait for one additional task to complete. - wait_for_num_tasks(1 + 1) - task_table = ray.tasks() - assert len(task_table) == 1 + 1 - task_id_set = set(task_table.keys()) - task_id_set.remove(driver_task_id) - task_id = list(task_id_set)[0] - - task_spec = task_table[task_id]["TaskSpec"] - assert task_spec["ActorID"] == nil_actor_id_hex - assert task_spec["Args"] == [ - signature.DUMMY_TYPE, 1, signature.DUMMY_TYPE, "hi", - signature.DUMMY_TYPE, x_id - ] - assert task_spec["JobID"] == job_id.hex() - assert task_spec["ReturnObjectIDs"] == [result_id] - - assert task_table[task_id] == ray.tasks(task_id) - - # Wait for two objects, one for the x_id and one for result_id. - wait_for_num_objects(2) - - def wait_for_object_table(): - timeout = 10 - start_time = time.time() - while time.time() - start_time < timeout: - object_table = ray.objects() - tables_ready = (object_table[x_id]["ManagerIDs"] is not None and - object_table[result_id]["ManagerIDs"] is not None) - if tables_ready: - return - time.sleep(0.1) - raise RayTestTimeoutException( - "Timed out while waiting for object table to " - "update.") - - object_table = ray.objects() - assert len(object_table) == 2 - - assert object_table[x_id] == ray.objects(x_id) - object_table_entry = ray.objects(result_id) - assert object_table[result_id] == object_table_entry - - # TODO(rkn): Pytest actually has tools for capturing stdout and stderr, so we # should use those, but they seem to conflict with Ray's use of faulthandler. class CaptureOutputAndError: diff --git a/python/ray/tests/test_asyncio.py b/python/ray/tests/test_asyncio.py index ebf544011..3872cf193 100644 --- a/python/ray/tests/test_asyncio.py +++ b/python/ray/tests/test_asyncio.py @@ -22,7 +22,7 @@ def test_asyncio_actor(ray_start_regular_shared): await self.event.wait() return sorted(self.batch) - a = AsyncBatcher.options(is_direct_call=True).remote() + a = AsyncBatcher.remote() x1 = a.add.remote(1) x2 = a.add.remote(2) x3 = a.add.remote(3) @@ -42,7 +42,7 @@ def test_asyncio_actor_same_thread(ray_start_regular_shared): async def async_thread_id(self): return threading.current_thread().ident - a = Actor.options(is_direct_call=True).remote() + a = Actor.remote() sync_id, async_id = ray.get( [a.sync_thread_id.remote(), a.async_thread_id.remote()]) @@ -66,7 +66,7 @@ def test_asyncio_actor_concurrency(ray_start_regular_shared): num_calls = 10 - a = RecordOrder.options(is_direct_call=True, max_concurrency=1).remote() + a = RecordOrder.options(max_concurrency=1).remote() ray.get([a.do_work.remote() for _ in range(num_calls)]) history = ray.get(a.get_history.remote()) @@ -99,8 +99,8 @@ def test_asyncio_actor_high_concurrency(ray_start_regular_shared): return sorted(self.batch) batch_size = sys.getrecursionlimit() * 4 - actor = AsyncConcurrencyBatcher.options( - max_concurrency=batch_size * 2, is_direct_call=True).remote(batch_size) + actor = AsyncConcurrencyBatcher.options(max_concurrency=batch_size * + 2).remote(batch_size) result = ray.get([actor.add.remote(i) for i in range(batch_size)]) assert result[0] == list(range(batch_size)) assert result[-1] == list(range(batch_size)) @@ -130,11 +130,11 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): with pytest.raises(ray.exceptions.RayTaskError): await ray.async_compat.get_async(task_throws.remote()) - # Test Direct Actor Call + # Test actor calls. str_len = 200 * 1024 @ray.remote - class DirectActor: + class Actor: def echo(self, i): return i @@ -145,18 +145,17 @@ async def test_asyncio_get(ray_start_regular_shared, event_loop): def throw_error(self): 1 / 0 - direct = DirectActor.options(is_direct_call=True).remote() + actor = Actor.remote() - direct_actor_call_future = ray.async_compat.get_async( - direct.echo.remote(2)) - assert await direct_actor_call_future == 2 + actor_call_future = ray.async_compat.get_async(actor.echo.remote(2)) + assert await actor_call_future == 2 promoted_to_plasma_future = ray.async_compat.get_async( - direct.big_object.remote()) + actor.big_object.remote()) assert await promoted_to_plasma_future == "a" * str_len with pytest.raises(ray.exceptions.RayTaskError): - await ray.async_compat.get_async(direct.throw_error.remote()) + await ray.async_compat.get_async(actor.throw_error.remote()) def test_asyncio_actor_async_get(ray_start_regular_shared): diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 9e504cf70..1de71bc1c 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1348,27 +1348,6 @@ def test_get_with_timeout(ray_start_regular): assert time.time() - start < 30 -@pytest.mark.parametrize( - "ray_start_cluster", [{ - "num_cpus": 1, - "num_nodes": 1, - }, { - "num_cpus": 1, - "num_nodes": 2, - }], - indirect=True) -def test_direct_call_simple(ray_start_cluster): - @ray.remote - def f(x): - return x + 1 - - f_direct = f.options(is_direct_call=True) - assert ray.get(f_direct.remote(2)) == 3 - for _ in range(10): - assert ray.get([f_direct.remote(i) for i in range(100)]) == list( - range(1, 101)) - - # https://github.com/ray-project/ray/issues/6329 def test_call_actors_indirect_through_tasks(ray_start_regular): @ray.remote @@ -1399,28 +1378,7 @@ def test_call_actors_indirect_through_tasks(ray_start_regular): ray.get(zoo.remote([c])) -def test_direct_call_refcount(ray_start_regular): - @ray.remote - def f(x): - return x + 1 - - @ray.remote - def sleep(): - time.sleep(.1) - return 1 - - # Multiple gets should not hang with ref counting enabled. - f_direct = f.options(is_direct_call=True) - x = f_direct.remote(2) - ray.get(x) - ray.get(x) - - # Temporary objects should be retained for chained callers. - y = f_direct.remote(sleep.options(is_direct_call=True).remote()) - assert ray.get(y) == 2 - - -def test_direct_call_matrix(shutdown_only): +def test_call_matrix(shutdown_only): ray.init(object_store_memory=1000 * 1024 * 1024) @ray.remote @@ -1456,23 +1414,23 @@ def test_direct_call_matrix(shutdown_only): if is_large else "small_object", "out_of_band" if out_of_band else "in_band") if source_actor: - a = Actor.options(is_direct_call=True).remote() + a = Actor.remote() if is_large: x_id = a.large_value.remote() else: x_id = a.small_value.remote() else: if is_large: - x_id = large_value.options(is_direct_call=True).remote() + x_id = large_value.remote() else: - x_id = small_value.options(is_direct_call=True).remote() + x_id = small_value.remote() if out_of_band: x_id = [x_id] if dest_actor: - b = Actor.options(is_direct_call=True).remote() + b = Actor.remote() x = ray.get(b.echo.remote(x_id)) else: - x = ray.get(echo.options(is_direct_call=True).remote(x_id)) + x = ray.get(echo.remote(x_id)) if is_large: assert isinstance(x, np.ndarray) else: @@ -1494,19 +1452,18 @@ def test_direct_call_matrix(shutdown_only): "num_nodes": 2, }], indirect=True) -def test_direct_call_chain(ray_start_cluster): +def test_call_chain(ray_start_cluster): @ray.remote def g(x): return x + 1 - g_direct = g.options(is_direct_call=True) x = 0 for _ in range(100): - x = g_direct.remote(x) + x = g.remote(x) assert ray.get(x) == 100 -def test_direct_inline_arg_memory_corruption(ray_start_regular): +def test_inline_arg_memory_corruption(ray_start_regular): @ray.remote def f(): return np.zeros(1000, dtype=np.uint8) @@ -1521,13 +1478,12 @@ def test_direct_inline_arg_memory_corruption(ray_start_regular): for prev in self.z: assert np.sum(prev) == 0, ("memory corruption detected", prev) - a = Actor.options(is_direct_call=True).remote() - f_direct = f.options(is_direct_call=True) + a = Actor.remote() for i in range(100): - ray.get(a.add.remote(f_direct.remote())) + ray.get(a.add.remote(f.remote())) -def test_direct_actor_enabled(ray_start_regular): +def test_skip_plasma(ray_start_regular): @ray.remote class Actor: def __init__(self): @@ -1536,14 +1492,14 @@ def test_direct_actor_enabled(ray_start_regular): def f(self, x): return x * 2 - a = Actor._remote(is_direct_call=True) + a = Actor.remote() obj_id = a.f.remote(1) # it is not stored in plasma assert not ray.worker.global_worker.core_worker.object_exists(obj_id) assert ray.get(obj_id) == 2 -def test_direct_actor_order(shutdown_only): +def test_actor_call_order(shutdown_only): ray.init(num_cpus=4) @ray.remote @@ -1561,14 +1517,12 @@ def test_direct_actor_order(shutdown_only): self.count += 1 return count - a = Actor._remote(is_direct_call=True) - assert ray.get([ - a.inc.remote(i, small_value.options(is_direct_call=True).remote()) - for i in range(100) - ]) == list(range(100)) + a = Actor.remote() + assert ray.get([a.inc.remote(i, small_value.remote()) + for i in range(100)]) == list(range(100)) -def test_direct_actor_large_objects(ray_start_regular): +def test_actor_large_objects(ray_start_regular): @ray.remote class Actor: def __init__(self): @@ -1578,7 +1532,7 @@ def test_direct_actor_large_objects(ray_start_regular): time.sleep(1) return np.zeros(10000000) - a = Actor._remote(is_direct_call=True) + a = Actor.remote() obj_id = a.f.remote() assert not ray.worker.global_worker.core_worker.object_exists(obj_id) done, _ = ray.wait([obj_id]) @@ -1587,7 +1541,7 @@ def test_direct_actor_large_objects(ray_start_regular): assert isinstance(ray.get(obj_id), np.ndarray) -def test_direct_actor_pass_by_ref(ray_start_regular): +def test_actor_pass_by_ref(ray_start_regular): @ray.remote class Actor: def __init__(self): @@ -1604,7 +1558,7 @@ def test_direct_actor_pass_by_ref(ray_start_regular): def error(): sys.exit(0) - a = Actor._remote(is_direct_call=True) + a = Actor.remote() assert ray.get(a.f.remote(f.remote(1))) == 2 fut = [a.f.remote(f.remote(i)) for i in range(100)] @@ -1615,7 +1569,7 @@ def test_direct_actor_pass_by_ref(ray_start_regular): ray.get(a.f.remote(error.remote())) -def test_direct_actor_pass_by_ref_order_optimization(shutdown_only): +def test_actor_pass_by_ref_order_optimization(shutdown_only): ray.init(num_cpus=4) @ray.remote @@ -1626,7 +1580,7 @@ def test_direct_actor_pass_by_ref_order_optimization(shutdown_only): def f(self, x): pass - a = Actor._remote(is_direct_call=True) + a = Actor.remote() @ray.remote def fast_value(): @@ -1652,7 +1606,7 @@ def test_direct_actor_pass_by_ref_order_optimization(shutdown_only): assert delta < 10, "did not skip slow value" -def test_direct_actor_recursive(ray_start_regular): +def test_actor_recursive(ray_start_regular): @ray.remote class Actor: def __init__(self, delegate=None): @@ -1663,9 +1617,9 @@ def test_direct_actor_recursive(ray_start_regular): return ray.get(self.delegate.f.remote(x)) return x * 2 - a = Actor._remote(is_direct_call=True) - b = Actor._remote(args=[a], is_direct_call=True) - c = Actor._remote(args=[b], is_direct_call=True) + a = Actor.remote() + b = Actor.remote(a) + c = Actor.remote(b) result = ray.get([c.f.remote(i) for i in range(100)]) assert result == [x * 2 for x in range(100)] @@ -1675,7 +1629,7 @@ def test_direct_actor_recursive(ray_start_regular): assert result == [x * 2 for x in range(100)] -def test_direct_actor_concurrent(ray_start_regular): +def test_actor_concurrent(ray_start_regular): @ray.remote class Batcher: def __init__(self): @@ -1690,7 +1644,7 @@ def test_direct_actor_concurrent(ray_start_regular): self.event.wait() return sorted(self.batch) - a = Batcher.options(is_direct_call=True, max_concurrency=3).remote() + a = Batcher.options(max_concurrency=3).remote() x1 = a.add.remote(1) x2 = a.add.remote(2) x3 = a.add.remote(3) diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index bdace4d54..6a75eed39 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -21,8 +21,6 @@ from ray.test_utils import ( RayTestTimeoutException, ) -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() - def test_failed_task(ray_start_regular): @ray.remote @@ -121,13 +119,13 @@ def test_get_throws_quickly_when_found_exception(ray_start_regular): assert err.type is exception f = random_path() - actor = Actor.options(is_direct_call=True, max_concurrency=2).remote() + actor = Actor.options(max_concurrency=2).remote() expect_exception([actor.bad_func1.remote(), actor.slow_func.remote(f)], ray.exceptions.RayTaskError) touch(f) f = random_path() - actor = Actor.options(is_direct_call=True, max_concurrency=2).remote() + actor = Actor.options(max_concurrency=2).remote() expect_exception([actor.bad_func2.remote(), actor.slow_func.remote(f)], ray.exceptions.RayActorError) touch(f) @@ -590,7 +588,7 @@ def test_export_large_objects(ray_start_regular): wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2) -@pytest.mark.skipif(RAY_FORCE_DIRECT, reason="TODO detect resource deadlock") +@pytest.mark.skip(reason="TODO detect resource deadlock") def test_warning_for_resource_deadlock(shutdown_only): # Check that we get warning messages for infeasible tasks. ray.init(num_cpus=1) @@ -955,7 +953,7 @@ def test_fill_object_store_exception(shutdown_only): "num_cpus": 1, }], indirect=True) -def test_direct_call_eviction(ray_start_cluster): +def test_eviction(ray_start_cluster): @ray.remote def large_object(): return np.zeros(10 * 1024 * 1024) @@ -989,7 +987,7 @@ def test_direct_call_eviction(ray_start_cluster): "num_cpus": 1, }], indirect=True) -def test_direct_call_serialized_id_eviction(ray_start_cluster): +def test_serialized_id_eviction(ray_start_cluster): @ray.remote def large_object(): return np.zeros(10 * 1024 * 1024) diff --git a/python/ray/tests/test_multi_node.py b/python/ray/tests/test_multi_node.py index 1d397f43f..b1b11304b 100644 --- a/python/ray/tests/test_multi_node.py +++ b/python/ray/tests/test_multi_node.py @@ -4,7 +4,6 @@ import subprocess import time import ray -from ray import ray_constants from ray.test_utils import ( RayTestTimeoutException, run_string_as_driver, @@ -480,9 +479,7 @@ print("success") assert "success" in out -@pytest.mark.skipif( - ray_constants.direct_call_enabled(), - reason="fate sharing not implemented yet") +@pytest.mark.skip(reason="fate sharing not implemented yet") def test_driver_exiting_when_worker_blocked(call_ray_start): # This test will create some drivers that submit some tasks and then # exit without waiting for the tasks to complete. diff --git a/python/ray/tests/test_multinode_failures.py b/python/ray/tests/test_multinode_failures.py index 9fa6f306e..ba54006be 100644 --- a/python/ray/tests/test_multinode_failures.py +++ b/python/ray/tests/test_multinode_failures.py @@ -11,8 +11,6 @@ import ray.ray_constants as ray_constants from ray.cluster_utils import Cluster from ray.test_utils import RayTestTimeoutException -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() - @pytest.fixture(params=[(1, 4), (4, 4)]) def ray_start_workers_separate_multinode(request): @@ -80,20 +78,16 @@ def _test_component_failed(cluster, component_type): # Submit many tasks with many dependencies. @ray.remote def f(x): - if RAY_FORCE_DIRECT: - # Sleep to make sure that tasks actually fail mid-execution. We - # only use it for direct calls because the test already takes a - # long time to run with the raylet codepath. - time.sleep(0.01) + # Sleep to make sure that tasks actually fail mid-execution. + time.sleep(0.01) return x @ray.remote def g(*xs): - if RAY_FORCE_DIRECT: - # Sleep to make sure that tasks actually fail mid-execution. We - # only use it for direct calls because the test already takes a - # long time to run with the raylet codepath. - time.sleep(0.01) + # Sleep to make sure that tasks actually fail mid-execution. We + # only use it for direct calls because the test already takes a + # long time to run with the raylet codepath. + time.sleep(0.01) return 1 # Kill the component on all nodes except the head node as the tasks @@ -151,7 +145,7 @@ def check_components_alive(cluster, component_type, check_component_alive): "num_nodes": 4, "_internal_config": json.dumps({ # Raylet codepath is not stable with a shorter timeout. - "num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100 + "num_heartbeats_timeout": 10 }), }], indirect=True) @@ -175,7 +169,7 @@ def test_raylet_failed(ray_start_cluster): "num_nodes": 2, "_internal_config": json.dumps({ # Raylet codepath is not stable with a shorter timeout. - "num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100 + "num_heartbeats_timeout": 10 }), }], indirect=True) diff --git a/python/ray/tests/test_multinode_failures_2.py b/python/ray/tests/test_multinode_failures_2.py index 5bc111c10..6284bbcae 100644 --- a/python/ray/tests/test_multinode_failures_2.py +++ b/python/ray/tests/test_multinode_failures_2.py @@ -9,12 +9,8 @@ import pytest import ray import ray.ray_constants as ray_constants -RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() - -@pytest.mark.skipif( - RAY_FORCE_DIRECT, - reason="No reconstruction for objects placed in plasma yet") +@pytest.mark.skip(reason="No reconstruction for objects placed in plasma yet") @pytest.mark.parametrize( "ray_start_cluster", [{ @@ -24,7 +20,7 @@ RAY_FORCE_DIRECT = ray_constants.direct_call_enabled() "object_store_memory": 1000 * 1024 * 1024, "_internal_config": json.dumps({ # Raylet codepath is not stable with a shorter timeout. - "num_heartbeats_timeout": 10 if RAY_FORCE_DIRECT else 100, + "num_heartbeats_timeout": 10, "object_manager_pull_timeout_ms": 1000, "object_manager_push_timeout_ms": 1000, "object_manager_repeated_push_delay_ms": 1000, diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index 9e173fc82..5a811fc93 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -7,14 +7,8 @@ import time import warnings import ray -from ray import ray_constants from ray.cluster_utils import Cluster -# TODO(yuhguo): This test file requires a lot of CPU/memory, and -# better be put in Jenkins. However, it fails frequently in Jenkins, but -# works well in Travis. We should consider moving it back to Jenkins once -# we figure out the reason. - if (multiprocessing.cpu_count() < 40 or ray.utils.get_system_memory() < 50 * 10**9): warnings.warn("This test must be run on large machines.") @@ -42,7 +36,7 @@ def ray_start_cluster_with_resource(): # This test is here to make sure that when we broadcast an object to a bunch of # machines, we don't have too many excess object transfers. -@pytest.mark.skipif(ray_constants.direct_call_enabled(), reason="TODO(ekl)") +@pytest.mark.skip(reason="TODO(ekl)") def test_object_broadcast(ray_start_cluster_with_resource): cluster, num_nodes = ray_start_cluster_with_resource diff --git a/python/ray/tests/test_unreconstructable_errors.py b/python/ray/tests/test_unreconstructable_errors.py index 70c6ecc31..c61fbdd58 100644 --- a/python/ray/tests/test_unreconstructable_errors.py +++ b/python/ray/tests/test_unreconstructable_errors.py @@ -2,7 +2,6 @@ import numpy as np import unittest import ray -from ray import ray_constants class TestUnreconstructableErrors(unittest.TestCase): @@ -23,25 +22,6 @@ class TestUnreconstructableErrors(unittest.TestCase): self.assertRaises(ray.exceptions.UnreconstructableError, lambda: ray.get(x_id)) - def testLineageEvictedReconstructionFails(self): - if ray_constants.direct_call_enabled(): - return # not relevant - - @ray.remote - def f(data): - return 0 - - x_id = f.remote(None) - ray.get(x_id) - # Hold references to the ray.put objects so they aren't LRU'd. - oids = [] - for _ in range(400): - new_oids = [f.remote(np.zeros(10000)) for _ in range(50)] - oids.extend(new_oids) - ray.get(new_oids) - self.assertRaises(ray.exceptions.UnreconstructableError, - lambda: ray.get(x_id)) - if __name__ == "__main__": import pytest