diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 888971966..f19a44255 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -72,7 +72,6 @@ from ray.state import (global_state, jobs, nodes, tasks, objects, timeline, available_resources, errors) # noqa: E402 from ray.worker import ( LOCAL_MODE, - PYTHON_MODE, SCRIPT_MODE, WORKER_MODE, connect, diff --git a/python/ray/actor.py b/python/ray/actor.py index 3361b29c4..2df1adf4e 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -526,40 +526,41 @@ class ActorHandle(object): kwargs = {} args = signature.extend_args(function_signature, args, kwargs) - # Execute functions locally if Ray is run in LOCAL_MODE - # Copy args to prevent the function from mutating them. - if worker.mode == ray.LOCAL_MODE: - return getattr(worker.actors[self._ray_actor_id], - method_name)(*copy.deepcopy(args)) - function_descriptor = FunctionDescriptor( self._ray_module_name, method_name, self._ray_class_name) - with self._ray_actor_lock: - object_ids = worker.submit_task( - function_descriptor, - args, - actor_id=self._ray_actor_id, - actor_handle_id=self._ray_actor_handle_id, - actor_counter=self._ray_actor_counter, - actor_creation_dummy_object_id=( - self._ray_actor_creation_dummy_object_id), - execution_dependencies=[self._ray_actor_cursor], - new_actor_handles=self._ray_new_actor_handles, - # We add one for the dummy return ID. - num_return_vals=num_return_vals + 1, - resources={"CPU": self._ray_actor_method_cpus}, - placement_resources={}, - job_id=self._ray_actor_job_id, - ) - # Update the actor counter and cursor to reflect the most recent - # invocation. - self._ray_actor_counter += 1 - # The last object returned is the dummy object that should be - # passed in to the next actor method. Do not return it to the user. - self._ray_actor_cursor = object_ids.pop() - # We have notified the backend of the new actor handles to expect - # since the last task was submitted, so clear the list. - self._ray_new_actor_handles = [] + + if worker.mode == ray.LOCAL_MODE: + function = getattr(worker.actors[self._ray_actor_id], method_name) + object_ids = worker.local_mode_manager.execute( + function, function_descriptor, args, num_return_vals) + else: + with self._ray_actor_lock: + object_ids = worker.submit_task( + function_descriptor, + args, + actor_id=self._ray_actor_id, + actor_handle_id=self._ray_actor_handle_id, + actor_counter=self._ray_actor_counter, + actor_creation_dummy_object_id=( + self._ray_actor_creation_dummy_object_id), + execution_dependencies=[self._ray_actor_cursor], + new_actor_handles=self._ray_new_actor_handles, + # We add one for the dummy return ID. + num_return_vals=num_return_vals + 1, + resources={"CPU": self._ray_actor_method_cpus}, + placement_resources={}, + job_id=self._ray_actor_job_id, + ) + # Update the actor counter and cursor to reflect the most + # recent invocation. + self._ray_actor_counter += 1 + # The last object returned is the dummy object that should be + # passed in to the next actor method. Do not return it to the + # user. + self._ray_actor_cursor = object_ids.pop() + # We have notified the backend of the new actor handles to + # expect since the last task was submitted, so clear the list. + self._ray_new_actor_handles = [] if len(object_ids) == 1: object_ids = object_ids[0] diff --git a/python/ray/internal/internal_api.py b/python/ray/internal/internal_api.py index 95d2e2a3b..9a5a98187 100644 --- a/python/ray/internal/internal_api.py +++ b/python/ray/internal/internal_api.py @@ -30,9 +30,6 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): """ worker = ray.worker.get_global_worker() - if ray.worker._mode() == ray.worker.LOCAL_MODE: - return - if isinstance(object_ids, ray.ObjectID): object_ids = [object_ids] @@ -46,6 +43,10 @@ def free(object_ids, local_only=False, delete_creating_tasks=False): raise TypeError("Attempting to call `free` on the value {}, " "which is not an ray.ObjectID.".format(object_id)) + if ray.worker._mode() == ray.worker.LOCAL_MODE: + worker.local_mode_manager.free(object_ids) + return + worker.check_connected() with profiling.profile("ray.free"): if len(object_ids) == 0: diff --git a/python/ray/local_mode_manager.py b/python/ray/local_mode_manager.py new file mode 100644 index 000000000..50b27d47c --- /dev/null +++ b/python/ray/local_mode_manager.py @@ -0,0 +1,130 @@ +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function + +import copy +import traceback + +from ray import ObjectID +from ray.utils import format_error_message +from ray.exceptions import RayTaskError + + +class LocalModeObjectID(ObjectID): + """Wrapper class around ray.ObjectID used for local mode. + + Object values are stored directly as a field of the LocalModeObjectID. + + Attributes: + value: Field that stores object values. If this field does not exist, + it equates to the object not existing in the object store. This is + necessary because None is a valid object value. + """ + pass + + +class LocalModeManager(object): + """Used to emulate remote operations when running in local mode.""" + + def __init__(self): + """Initialize a LocalModeManager.""" + + def execute(self, function, function_descriptor, args, num_return_vals): + """Synchronously executes a "remote" function or actor method. + + Stores results directly in the generated and returned + LocalModeObjectIDs. Any exceptions raised during function execution + will be stored under all returned object IDs and later raised by the + worker. + + Args: + function: The function to execute. + function_descriptor: Metadata about the function. + args: Arguments to the function. These will not be modified by + the function execution. + num_return_vals: Number of expected return values specified in the + function's decorator. + + Returns: + LocalModeObjectIDs corresponding to the function return values. + """ + object_ids = [ + LocalModeObjectID.from_random() for _ in range(num_return_vals) + ] + try: + results = function(*copy.deepcopy(args)) + if num_return_vals == 1: + object_ids[0].value = results + else: + for object_id, result in zip(object_ids, results): + object_id.value = result + except Exception: + function_name = function_descriptor.function_name + backtrace = format_error_message(traceback.format_exc()) + task_error = RayTaskError(function_name, backtrace) + for object_id in object_ids: + object_id.value = task_error + + return object_ids + + def put_object(self, value): + """Store an object in the emulated object store. + + Implemented by generating a LocalModeObjectID and storing the value + directly within it. + + Args: + value: The value to store. + + Returns: + LocalModeObjectID corresponding to the value. + """ + object_id = LocalModeObjectID.from_random() + object_id.value = value + return object_id + + def get_object(self, object_ids): + """Fetch objects from the emulated object store. + + Accepts only LocalModeObjectIDs and reads values directly from them. + + Args: + object_ids: A list of object IDs to fetch values for. + + Raises: + TypeError if any of the object IDs are not LocalModeObjectIDs. + KeyError if any of the object IDs do not contain values. + """ + results = [] + for object_id in object_ids: + if not isinstance(object_id, LocalModeObjectID): + raise TypeError("Only LocalModeObjectIDs are supported " + "when running in LOCAL_MODE. Using " + "user-generated ObjectIDs will fail.") + if not hasattr(object_id, "value"): + raise KeyError("Value for {} not found".format(object_id)) + + results.append(object_id.value) + + return results + + def free(self, object_ids): + """Delete objects from the emulated object store. + + Accepts only LocalModeObjectIDs and deletes their values directly. + + Args: + object_ids: A list of ObjectIDs to delete. + + Raises: + TypeError if any of the object IDs are not LocalModeObjectIDs. + """ + for object_id in object_ids: + if not isinstance(object_id, LocalModeObjectID): + raise TypeError("Only LocalModeObjectIDs are supported " + "when running in LOCAL_MODE. Using " + "user-generated ObjectIDs will fail.") + try: + del object_id.value + except AttributeError: + pass diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 025218612..38a382e3c 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -2,7 +2,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import copy import logging from functools import wraps @@ -137,17 +136,16 @@ class RemoteFunction(object): kwargs) if worker.mode == ray.worker.LOCAL_MODE: - # In LOCAL_MODE, remote calls simply execute the function. - # We copy the arguments to prevent the function call from - # mutating them and to match the usual behavior of - # immutable remote objects. - result = self._function(*copy.deepcopy(args)) - return result - object_ids = worker.submit_task( - self._function_descriptor, - args, - num_return_vals=num_return_vals, - resources=resources) + object_ids = worker.local_mode_manager.execute( + self._function, self._function_descriptor, args, + num_return_vals) + else: + object_ids = worker.submit_task( + self._function_descriptor, + args, + num_return_vals=num_return_vals, + resources=resources) + if len(object_ids) == 1: return object_ids[0] elif len(object_ids) > 1: diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 9223a2f61..87b8043c4 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -1586,22 +1586,21 @@ def test_local_mode(shutdown_only): return np.ones([3, 4, 5]) xref = f.remote() - # Remote functions should return by value. - assert np.alltrue(xref == np.ones([3, 4, 5])) - # Check that ray.get is the identity. - assert np.alltrue(xref == ray.get(xref)) + # Remote functions should return ObjectIDs. + assert isinstance(xref, ray.ObjectID) + assert np.alltrue(ray.get(xref) == np.ones([3, 4, 5])) y = np.random.normal(size=[11, 12]) - # Check that ray.put is the identity. - assert np.alltrue(y == ray.put(y)) + # Check that ray.get(ray.put) is the identity. + assert np.alltrue(y == ray.get(ray.put(y))) # Make sure objects are immutable, this example is why we need to copy # arguments before passing them into remote functions in python mode aref = local_mode_f.remote() - assert np.alltrue(aref == np.array([0, 0])) - bref = local_mode_g.remote(aref) + assert np.alltrue(ray.get(aref) == np.array([0, 0])) + bref = local_mode_g.remote(ray.get(aref)) # Make sure local_mode_g does not mutate aref. - assert np.alltrue(aref == np.array([0, 0])) - assert np.alltrue(bref == np.array([1, 0])) + assert np.alltrue(ray.get(aref) == np.array([0, 0])) + assert np.alltrue(ray.get(bref) == np.array([1, 0])) # wait should return the first num_returns values passed in as the # first list and the remaining values as the second list @@ -1612,6 +1611,25 @@ def test_local_mode(shutdown_only): assert ready == object_ids[:num_returns] assert remaining == object_ids[num_returns:] + # Check that ray.put() and ray.internal.free() work in local mode. + + v1 = np.ones(10) + v2 = np.zeros(10) + + k1 = ray.put(v1) + assert np.alltrue(v1 == ray.get(k1)) + k2 = ray.put(v2) + assert np.alltrue(v2 == ray.get(k2)) + + ray.internal.free([k1, k2]) + with pytest.raises(Exception): + ray.get(k1) + with pytest.raises(Exception): + ray.get(k2) + + # Should fail silently. + ray.internal.free([k1, k2]) + # Test actors in LOCAL_MODE. @ray.remote @@ -1629,9 +1647,14 @@ def test_local_mode(shutdown_only): array[0] = -1 self.array = array + @ray.method(num_return_vals=3) + def returns_multiple(self): + return 1, 2, 3 + test_actor = LocalModeTestClass.remote(np.arange(10)) - # Remote actor functions should return by value - assert np.alltrue(test_actor.get_array.remote() == np.arange(10)) + obj = test_actor.get_array.remote() + assert isinstance(obj, ray.ObjectID) + assert np.alltrue(ray.get(obj) == np.arange(10)) test_array = np.arange(10) # Remote actor functions should not mutate arguments @@ -1639,9 +1662,9 @@ def test_local_mode(shutdown_only): assert np.alltrue(test_array == np.arange(10)) # Remote actor functions should keep state test_array[0] = -1 - assert np.alltrue(test_array == test_actor.get_array.remote()) + assert np.alltrue(test_array == ray.get(test_actor.get_array.remote())) - # Check that actor handles work in Python mode. + # Check that actor handles work in local mode. @ray.remote def use_actor_handle(handle): @@ -1651,6 +1674,47 @@ def test_local_mode(shutdown_only): ray.get(use_actor_handle.remote(test_actor)) + # Check that exceptions are deferred until ray.get(). + + exception_str = "test_basic remote task exception" + + @ray.remote + def throws(): + raise Exception(exception_str) + + obj = throws.remote() + with pytest.raises(Exception, match=exception_str): + ray.get(obj) + + # Check that multiple return values are handled properly. + + @ray.remote(num_return_vals=3) + def returns_multiple(): + return 1, 2, 3 + + obj1, obj2, obj3 = returns_multiple.remote() + assert ray.get(obj1) == 1 + assert ray.get(obj2) == 2 + assert ray.get(obj3) == 3 + assert ray.get([obj1, obj2, obj3]) == [1, 2, 3] + + obj1, obj2, obj3 = test_actor.returns_multiple.remote() + assert ray.get(obj1) == 1 + assert ray.get(obj2) == 2 + assert ray.get(obj3) == 3 + assert ray.get([obj1, obj2, obj3]) == [1, 2, 3] + + @ray.remote(num_return_vals=2) + def returns_multiple_throws(): + raise Exception(exception_str) + + obj1, obj2 = returns_multiple_throws.remote() + with pytest.raises(Exception, match=exception_str): + ray.get(obj) + ray.get(obj1) + with pytest.raises(Exception, match=exception_str): + ray.get(obj2) + def test_resource_constraints(shutdown_only): num_workers = 20 diff --git a/python/ray/worker.py b/python/ray/worker.py index 15ee79890..092061e84 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -68,11 +68,11 @@ from ray.utils import ( setup_logger, thread_safe_client, ) +from ray.local_mode_manager import LocalModeManager SCRIPT_MODE = 0 WORKER_MODE = 1 LOCAL_MODE = 2 -PYTHON_MODE = 3 ERROR_KEY_PREFIX = b"Error:" @@ -270,7 +270,7 @@ class Worker(object): The mode LOCAL_MODE should be used if this Worker is a driver and if you want to run the driver in a manner equivalent to serial Python for debugging purposes. It will not send remote function calls to the - scheduler and will insead execute them in a blocking fashion. + scheduler and will instead execute them in a blocking fashion. Args: mode: One of SCRIPT_MODE, WORKER_MODE, and LOCAL_MODE. @@ -490,6 +490,10 @@ class Worker(object): Args: object_ids (List[object_id.ObjectID]): A list of the object IDs whose values should be retrieved. + + Raises: + Exception if running in LOCAL_MODE and any of the object IDs do not + exist in the emulated object store. """ # Make sure that the values are object IDs. for object_id in object_ids: @@ -497,6 +501,10 @@ class Worker(object): raise TypeError( "Attempting to call `get` on the value {}, " "which is not an ray.ObjectID.".format(object_id)) + + if self.mode == LOCAL_MODE: + return self.local_mode_manager.get_object(object_ids) + # Do an initial fetch for remote objects. We divide the fetch into # smaller fetches so as to not block the manager for a prolonged period # of time in a single call. @@ -1064,7 +1072,7 @@ def get_gpu_ids(): A list of GPU IDs. """ if _mode() == LOCAL_MODE: - raise Exception("ray.get_gpu_ids() currently does not work in PYTHON " + raise Exception("ray.get_gpu_ids() currently does not work in LOCAL " "MODE.") all_resource_ids = global_worker.raylet_client.resource_ids() @@ -1092,7 +1100,7 @@ def get_resource_ids(): """ if _mode() == LOCAL_MODE: raise Exception( - "ray.get_resource_ids() currently does not work in PYTHON " + "ray.get_resource_ids() currently does not work in LOCAL " "MODE.") return global_worker.raylet_client.resource_ids() @@ -1742,6 +1750,7 @@ def connect(node, # If running Ray in LOCAL_MODE, there is no need to create call # create_worker or to start the worker service. if mode == LOCAL_MODE: + worker.local_mode_manager = LocalModeManager() return # Create a Redis client. @@ -2174,17 +2183,12 @@ def get(object_ids): worker = global_worker worker.check_connected() with profiling.profile("ray.get"): - if worker.mode == LOCAL_MODE: - # In LOCAL_MODE, ray.get is the identity operation (the input will - # actually be a value not an objectid). - return object_ids - is_individual_id = isinstance(object_ids, ray.ObjectID) if is_individual_id: object_ids = [object_ids] if not isinstance(object_ids, list): - raise ValueError("'object_ids' must either by an object ID " + raise ValueError("'object_ids' must either be an object ID " "or a list of object IDs.") global last_task_error_raise_time @@ -2216,13 +2220,13 @@ def put(value): worker.check_connected() with profiling.profile("ray.put"): if worker.mode == LOCAL_MODE: - # In LOCAL_MODE, ray.put is the identity operation. - return value - object_id = ray._raylet.compute_put_id( - worker.current_task_id, - worker.task_context.put_index, - ) - worker.put_object(object_id, value) + object_id = worker.local_mode_manager.put_object(value) + else: + object_id = ray._raylet.compute_put_id( + worker.current_task_id, + worker.task_context.put_index, + ) + worker.put_object(object_id, value) worker.task_context.put_index += 1 return object_id @@ -2282,12 +2286,10 @@ def wait(object_ids, num_returns=1, timeout=None): raise ValueError("The 'timeout' argument must be nonnegative. " "Received {}".format(timeout)) - if worker.mode != LOCAL_MODE: - for object_id in object_ids: - if not isinstance(object_id, ObjectID): - raise TypeError("wait() expected a list of ray.ObjectID, " - "got list containing {}".format( - type(object_id))) + for object_id in object_ids: + if not isinstance(object_id, ObjectID): + raise TypeError("wait() expected a list of ray.ObjectID, " + "got list containing {}".format(type(object_id))) worker.check_connected() # TODO(swang): Check main thread.