From 9bfc2c4b5462871f0fe669b3aabc5f2772bc3a6f Mon Sep 17 00:00:00 2001 From: ijrsvt Date: Wed, 1 Apr 2020 13:50:57 -0700 Subject: [PATCH] Moving Local Mode to C++ (#7670) --- python/ray/_raylet.pxd | 1 + python/ray/_raylet.pyx | 39 +- python/ray/actor.py | 118 +++--- python/ray/dashboard/client/src/api.ts | 3 +- .../pages/dashboard/logical-view/Actor.tsx | 4 +- python/ray/function_manager.py | 7 +- python/ray/includes/common.pxd | 3 + python/ray/includes/libcoreworker.pxd | 3 +- python/ray/local_mode_manager.py | 152 -------- python/ray/node.py | 1 + python/ray/parameter.py | 4 - python/ray/remote_function.py | 10 +- python/ray/tests/test_advanced.py | 180 --------- python/ray/tests/test_basic.py | 368 ++++++++++++------ python/ray/worker.py | 101 ++--- src/ray/core_worker/core_worker.cc | 128 ++++-- src/ray/core_worker/core_worker.h | 15 +- 17 files changed, 489 insertions(+), 648 deletions(-) delete mode 100644 python/ray/local_mode_manager.py diff --git a/python/ray/_raylet.pxd b/python/ray/_raylet.pxd index 53157a1cb..85c4512e1 100644 --- a/python/ray/_raylet.pxd +++ b/python/ray/_raylet.pxd @@ -76,6 +76,7 @@ cdef class CoreWorker: object async_thread object async_event_loop object plasma_event_handler + c_bool is_local_mode cdef _create_put_buffer(self, shared_ptr[CBuffer] &metadata, size_t data_size, ObjectID object_id, diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 63400ff47..acedced0e 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -645,16 +645,17 @@ cdef class CoreWorker: def __cinit__(self, is_driver, store_socket, raylet_socket, JobID job_id, GcsClientOptions gcs_options, log_dir, - node_ip_address, node_manager_port): - + node_ip_address, node_manager_port, local_mode): + use_driver = is_driver or local_mode self.core_worker.reset(new CCoreWorker( - WORKER_TYPE_DRIVER if is_driver else WORKER_TYPE_WORKER, + WORKER_TYPE_DRIVER if use_driver else WORKER_TYPE_WORKER, LANGUAGE_PYTHON, store_socket.encode("ascii"), raylet_socket.encode("ascii"), job_id.native(), gcs_options.native()[0], log_dir.encode("utf-8"), node_ip_address.encode("utf-8"), node_manager_port, task_execution_handler, check_signals, gc_collect, - get_py_stack, True)) + get_py_stack, True, local_mode)) + self.is_local_mode = local_mode def run_task_loop(self): with nogil: @@ -738,6 +739,7 @@ cdef class CoreWorker: CObjectID c_object_id shared_ptr[CBuffer] data shared_ptr[CBuffer] metadata + c_vector[CObjectID] c_object_id_vector metadata = string_to_buffer(serialized_object.metadata) total_bytes = serialized_object.total_bytes @@ -748,12 +750,19 @@ cdef class CoreWorker: if not object_already_exists: write_serialized_object(serialized_object, data) - with nogil: - # Using custom object IDs is not supported because we can't - # track their lifecycle, so don't pin the object in that case. - check_status( - self.core_worker.get().Seal( - c_object_id, pin_object and object_id is None)) + if self.is_local_mode: + c_object_id_vector.push_back(c_object_id) + check_status(self.core_worker.get().Put( + CRayObject(data, metadata, c_object_id_vector), + c_object_id_vector, c_object_id)) + else: + with nogil: + # Using custom object IDs is not supported because we can't + # track their lifecycle, so we don't pin the object in this + # case. + check_status(self.core_worker.get().Seal( + c_object_id, + pin_object and object_id is None)) return c_object_id.Binary() @@ -1057,6 +1066,7 @@ cdef class CoreWorker: c_vector[size_t] data_sizes c_vector[shared_ptr[CBuffer]] metadatas c_vector[c_vector[CObjectID]] contained_ids + c_vector[CObjectID] return_ids_vector if return_ids.size() == 0: return @@ -1086,6 +1096,15 @@ cdef class CoreWorker: if returns[0][i].get() != NULL: write_serialized_object( serialized_object, returns[0][i].get().GetData()) + if self.is_local_mode: + return_ids_vector.push_back(return_ids[i]) + check_status( + self.core_worker.get().Put( + CRayObject(returns[0][i].get().GetData(), + returns[0][i].get().GetMetadata(), + return_ids_vector), + return_ids_vector, return_ids[i])) + return_ids_vector.clear() def create_or_get_event_loop(self): if self.async_event_loop is None: diff --git a/python/ray/actor.py b/python/ray/actor.py index 879498725..e96dadb19 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -1,4 +1,3 @@ -import copy import inspect import logging import weakref @@ -10,7 +9,7 @@ import ray.ray_constants as ray_constants import ray._raylet import ray.signature as signature import ray.worker -from ray import ActorID, ActorClassID, Language +from ray import ActorClassID, Language from ray._raylet import PythonFunctionDescriptor from ray import cross_language @@ -503,66 +502,57 @@ class ActorClass: if meta.num_cpus is None else meta.num_cpus) actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED - # Do not export the actor class or the actor if run in LOCAL_MODE - # Instead, instantiate the actor locally and add it to the worker's - # dictionary + # LOCAL_MODE cannot handle cross_language if worker.mode == ray.LOCAL_MODE: assert not meta.is_cross_language, \ "Cross language ActorClass cannot be executed locally." - actor_id = ActorID.from_random() - worker.actors[actor_id] = meta.modified_class( - *copy.deepcopy(args), **copy.deepcopy(kwargs)) + + # Export the actor. + if not meta.is_cross_language and (meta.last_export_session_and_job != + worker.current_session_and_job): + # If this actor class was not exported in this session and job, + # we need to export this function again, because current GCS + # doesn't have it. + meta.last_export_session_and_job = (worker.current_session_and_job) + # After serialize / deserialize modified class, the __module__ + # of modified class will be ray.cloudpickle.cloudpickle. + # So, here pass actor_creation_function_descriptor to make + # sure export actor class correct. + worker.function_actor_manager.export_actor_class( + meta.modified_class, meta.actor_creation_function_descriptor, + meta.method_meta.methods.keys()) + + resources = ray.utils.resources_from_resource_arguments( + cpus_to_use, meta.num_gpus, meta.memory, meta.object_store_memory, + meta.resources, num_cpus, num_gpus, memory, object_store_memory, + resources) + + # If the actor methods require CPU resources, then set the required + # placement resources. If actor_placement_resources is empty, then + # the required placement resources will be the same as resources. + actor_placement_resources = {} + assert actor_method_cpu in [0, 1] + if actor_method_cpu == 1: + actor_placement_resources = resources.copy() + actor_placement_resources["CPU"] += 1 + if meta.is_cross_language: + creation_args = cross_language.format_args(worker, args, kwargs) else: - # Export the actor. - if not meta.is_cross_language and (meta.last_export_session_and_job - != - worker.current_session_and_job): - # If this actor class was not exported in this session and job, - # we need to export this function again, because current GCS - # doesn't have it. - meta.last_export_session_and_job = ( - worker.current_session_and_job) - # After serialize / deserialize modified class, the __module__ - # of modified class will be ray.cloudpickle.cloudpickle. - # So, here pass actor_creation_function_descriptor to make - # sure export actor class correct. - worker.function_actor_manager.export_actor_class( - meta.modified_class, - meta.actor_creation_function_descriptor, - meta.method_meta.methods.keys()) - - resources = ray.utils.resources_from_resource_arguments( - cpus_to_use, meta.num_gpus, meta.memory, - meta.object_store_memory, meta.resources, num_cpus, num_gpus, - memory, object_store_memory, resources) - - # If the actor methods require CPU resources, then set the required - # placement resources. If actor_placement_resources is empty, then - # the required placement resources will be the same as resources. - actor_placement_resources = {} - assert actor_method_cpu in [0, 1] - if actor_method_cpu == 1: - actor_placement_resources = resources.copy() - actor_placement_resources["CPU"] += 1 - if meta.is_cross_language: - creation_args = cross_language.format_args( - worker, args, kwargs) - else: - function_signature = meta.method_meta.signatures["__init__"] - creation_args = signature.flatten_args(function_signature, - args, kwargs) - actor_id = worker.core_worker.create_actor( - meta.language, - meta.actor_creation_function_descriptor, - creation_args, - meta.max_reconstructions, - resources, - actor_placement_resources, - max_concurrency, - detached, - is_asyncio, - # Store actor_method_cpu in actor handle's extension data. - extension_data=str(actor_method_cpu)) + function_signature = meta.method_meta.signatures["__init__"] + creation_args = signature.flatten_args(function_signature, args, + kwargs) + actor_id = worker.core_worker.create_actor( + meta.language, + meta.actor_creation_function_descriptor, + creation_args, + meta.max_reconstructions, + resources, + actor_placement_resources, + max_concurrency, + detached, + is_asyncio, + # Store actor_method_cpu in actor handle's extension data. + extension_data=str(actor_method_cpu)) actor_handle = ActorHandle( meta.language, @@ -705,14 +695,10 @@ class ActorHandle: assert not self._ray_is_cross_language,\ "Cross language remote actor method " \ "cannot be executed locally." - function = getattr(worker.actors[self._actor_id], method_name) - object_ids = worker.local_mode_manager.execute( - function, method_name, args, kwargs, num_return_vals) - else: - object_ids = worker.core_worker.submit_actor_task( - self._ray_actor_language, self._ray_actor_id, - function_descriptor, list_args, num_return_vals, - self._ray_actor_method_cpus) + + object_ids = worker.core_worker.submit_actor_task( + self._ray_actor_language, self._ray_actor_id, function_descriptor, + list_args, num_return_vals, self._ray_actor_method_cpus) if len(object_ids) == 1: object_ids = object_ids[0] diff --git a/python/ray/dashboard/client/src/api.ts b/python/ray/dashboard/client/src/api.ts index 184a1f517..05992a2c2 100644 --- a/python/ray/dashboard/client/src/api.ts +++ b/python/ray/dashboard/client/src/api.ts @@ -192,7 +192,8 @@ export const launchKillActor = ( actorIpAddress: string, actorPort: number, ) => - get("/api/kill_actor", { + get("/api/kill_actor", { + // make sure object is okay actor_id: actorId, ip_address: actorIpAddress, port: actorPort, diff --git a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx index baa864bc4..a607a3d98 100644 --- a/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx +++ b/python/ray/dashboard/client/src/pages/dashboard/logical-view/Actor.tsx @@ -256,12 +256,10 @@ class Actor extends React.Component, State> { ))} ){" "} - {actor.state === 0 ? ( + {actor.state === 0 && ( Kill Actor - ) : ( - "" )} {Object.entries(profiling).map( ([profilingId, { startTime, latestResponse }]) => diff --git a/python/ray/function_manager.py b/python/ray/function_manager.py index 793e5118f..b42bdd842 100644 --- a/python/ray/function_manager.py +++ b/python/ray/function_manager.py @@ -128,8 +128,6 @@ class FunctionActorManager: Args: remote_function: the RemoteFunction object. """ - if self._worker.mode == ray.worker.LOCAL_MODE: - return if self._worker.load_code_from_local: return @@ -348,8 +346,9 @@ class FunctionActorManager: # finish before the task finished, and still uses Ray API # after that. assert not self._worker.current_job_id.is_nil(), ( - "You might have started a background thread in a non-actor task, " - "please make sure the thread finishes before the task finishes.") + "You might have started a background thread in a non-actor " + "task, please make sure the thread finishes before the " + "task finishes.") job_id = self._worker.current_job_id key = (b"ActorClass:" + job_id.binary() + b":" + actor_creation_function_descriptor.function_id.binary()) diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index da38c83b6..01542d756 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -193,6 +193,9 @@ cdef extern from "ray/common/buffer.h" namespace "ray" nogil: cdef extern from "ray/common/ray_object.h" nogil: cdef cppclass CRayObject "ray::RayObject": + CRayObject(const shared_ptr[CBuffer] &data, + const shared_ptr[CBuffer] &metadata, + const c_vector[CObjectID] &nested_ids) c_bool HasData() const c_bool HasMetadata() const const size_t DataSize() const diff --git a/python/ray/includes/libcoreworker.pxd b/python/ray/includes/libcoreworker.pxd index 053a968fb..d903f7e60 100644 --- a/python/ray/includes/libcoreworker.pxd +++ b/python/ray/includes/libcoreworker.pxd @@ -98,7 +98,8 @@ cdef extern from "ray/core_worker/core_worker.h" nogil: CRayStatus() nogil, void() nogil, void(c_string *stack_out) nogil, - c_bool ref_counting_enabled) + c_bool ref_counting_enabled, + c_bool local_worker) CWorkerType &GetWorkerType() CLanguage &GetLanguage() diff --git a/python/ray/local_mode_manager.py b/python/ray/local_mode_manager.py deleted file mode 100644 index 2b2bae292..000000000 --- a/python/ray/local_mode_manager.py +++ /dev/null @@ -1,152 +0,0 @@ -import copy -import traceback - -import ray -from ray import ObjectID -from ray.utils import format_error_message -from ray.exceptions import RayTaskError - - -class LocalModeObjectID(ObjectID): - """Wrapper class around ray.ObjectID used for local mode. - - Object values are stored directly as a field of the LocalModeObjectID. - - Attributes: - value: Field that stores object values. If this field does not exist, - it equates to the object not existing in the object store. This is - necessary because None is a valid object value. - """ - - def __copy__(self): - new = LocalModeObjectID(self.binary()) - if hasattr(self, "value"): - new.value = self.value - return new - - def __deepcopy__(self, memo=None): - new = LocalModeObjectID(self.binary()) - if hasattr(self, "value"): - new.value = self.value - return new - - -class LocalModeManager: - """Used to emulate remote operations when running in local mode.""" - - def __init__(self): - """Initialize a LocalModeManager.""" - - def execute(self, function, function_name, args, kwargs, num_return_vals): - """Synchronously executes a "remote" function or actor method. - - Stores results directly in the generated and returned - LocalModeObjectIDs. Any exceptions raised during function execution - will be stored under all returned object IDs and later raised by the - worker. - - Args: - function: The function to execute. - function_name: Name of the function to execute. - args: Arguments to the function. These will not be modified by - the function execution. - kwargs: Keyword arguments to the function. - num_return_vals: Number of expected return values specified in the - function's decorator. - - Returns: - LocalModeObjectIDs corresponding to the function return values. - """ - return_ids = [ - LocalModeObjectID.from_random() for _ in range(num_return_vals) - ] - new_args = [] - for i, arg in enumerate(args): - if isinstance(arg, ObjectID): - new_args.append(ray.get(arg)) - else: - new_args.append(copy.deepcopy(arg)) - - new_kwargs = {} - for k, v in kwargs.items(): - if isinstance(v, ObjectID): - new_kwargs[k] = ray.get(v) - else: - new_kwargs[k] = copy.deepcopy(v) - - try: - results = function(*new_args, **new_kwargs) - if num_return_vals == 1: - return_ids[0].value = results - else: - for object_id, result in zip(return_ids, results): - object_id.value = result - except Exception as e: - backtrace = format_error_message(traceback.format_exc()) - task_error = RayTaskError(function_name, backtrace, e.__class__) - for object_id in return_ids: - object_id.value = task_error - - return return_ids - - def put_object(self, value): - """Store an object in the emulated object store. - - Implemented by generating a LocalModeObjectID and storing the value - directly within it. - - Args: - value: The value to store. - - Returns: - LocalModeObjectID corresponding to the value. - """ - object_id = LocalModeObjectID.from_random() - object_id.value = value - return object_id - - def get_objects(self, object_ids): - """Fetch objects from the emulated object store. - - Accepts only LocalModeObjectIDs and reads values directly from them. - - Args: - object_ids: A list of object IDs to fetch values for. - - Raises: - TypeError if any of the object IDs are not LocalModeObjectIDs. - KeyError if any of the object IDs do not contain values. - """ - results = [] - for object_id in object_ids: - if not isinstance(object_id, LocalModeObjectID): - raise TypeError("Only LocalModeObjectIDs are supported " - "when running in LOCAL_MODE. Using " - "user-generated ObjectIDs will fail.") - if not hasattr(object_id, "value"): - raise KeyError("Value for {} not found".format(object_id)) - - results.append(object_id.value) - - return results - - def free(self, object_ids): - """Delete objects from the emulated object store. - - Accepts only LocalModeObjectIDs and deletes their values directly. - - Args: - object_ids: A list of ObjectIDs to delete. - - Raises: - TypeError if any of the object IDs are not LocalModeObjectIDs. - """ - for object_id in object_ids: - if not isinstance(object_id, LocalModeObjectID): - raise TypeError("Only LocalModeObjectIDs are supported " - "when running in LOCAL_MODE. Using " - "user-generated ObjectIDs will fail.") - try: - del object_id.value - except AttributeError: - pass diff --git a/python/ray/node.py b/python/ray/node.py index 10a129f9f..e885258fe 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -923,6 +923,7 @@ class Node: return not any(self.dead_processes()) +# TODO(ilr) Remove this soon class LocalNode: """Imitate the node that manages the processes in local mode.""" diff --git a/python/ray/parameter.py b/python/ray/parameter.py index 752d2c694..6cd4f6d4d 100644 --- a/python/ray/parameter.py +++ b/python/ray/parameter.py @@ -37,8 +37,6 @@ class RayParams: object IDs. The same value can be used across multiple runs of the same job in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different jobs. - local_mode (bool): True if the code should be executed serially - without Ray. This is useful for debugging. redirect_worker_output: True if the stdout and stderr of worker processes should be redirected to files. redirect_output (bool): True if stdout and stderr for non-worker @@ -98,7 +96,6 @@ class RayParams: node_manager_port=None, node_ip_address=None, object_id_seed=None, - local_mode=False, driver_mode=None, redirect_worker_output=None, redirect_output=None, @@ -134,7 +131,6 @@ class RayParams: self.object_manager_port = object_manager_port self.node_manager_port = node_manager_port self.node_ip_address = node_ip_address - self.local_mode = local_mode self.driver_mode = driver_mode self.redirect_worker_output = redirect_worker_output self.redirect_output = redirect_output diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index 5a250ddeb..a1e571d5c 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -203,13 +203,9 @@ class RemoteFunction: assert not self._is_cross_language, \ "Cross language remote function " \ "cannot be executed locally." - object_ids = worker.local_mode_manager.execute( - self._function, self._function_descriptor, args, kwargs, - num_return_vals) - else: - object_ids = worker.core_worker.submit_task( - self._language, self._function_descriptor, list_args, - num_return_vals, resources, max_retries) + object_ids = worker.core_worker.submit_task( + self._language, self._function_descriptor, list_args, + num_return_vals, resources, max_retries) if len(object_ids) == 1: return object_ids[0] diff --git a/python/ray/tests/test_advanced.py b/python/ray/tests/test_advanced.py index cc1afe75f..65918a239 100644 --- a/python/ray/tests/test_advanced.py +++ b/python/ray/tests/test_advanced.py @@ -501,186 +501,6 @@ def test_multithreading(ray_start_2_cpus): ray.get(actor.join.remote()) == "ok" -def test_local_mode(shutdown_only): - @ray.remote - def local_mode_f(): - return np.array([0, 0]) - - @ray.remote - def local_mode_g(x): - x[0] = 1 - return x - - ray.init(local_mode=True) - - @ray.remote - def f(): - return np.ones([3, 4, 5]) - - xref = f.remote() - # Remote functions should return ObjectIDs. - assert isinstance(xref, ray.ObjectID) - assert np.alltrue(ray.get(xref) == np.ones([3, 4, 5])) - y = np.random.normal(size=[11, 12]) - # Check that ray.get(ray.put) is the identity. - assert np.alltrue(y == ray.get(ray.put(y))) - - # Make sure objects are immutable, this example is why we need to copy - # arguments before passing them into remote functions in python mode - aref = local_mode_f.remote() - assert np.alltrue(ray.get(aref) == np.array([0, 0])) - bref = local_mode_g.remote(ray.get(aref)) - # Make sure local_mode_g does not mutate aref. - assert np.alltrue(ray.get(aref) == np.array([0, 0])) - assert np.alltrue(ray.get(bref) == np.array([1, 0])) - - # wait should return the first num_returns values passed in as the - # first list and the remaining values as the second list - num_returns = 5 - object_ids = [ray.put(i) for i in range(20)] - ready, remaining = ray.wait( - object_ids, num_returns=num_returns, timeout=None) - assert ready == object_ids[:num_returns] - assert remaining == object_ids[num_returns:] - - # Check that ray.put() and ray.internal.free() work in local mode. - - v1 = np.ones(10) - v2 = np.zeros(10) - - k1 = ray.put(v1) - assert np.alltrue(v1 == ray.get(k1)) - k2 = ray.put(v2) - assert np.alltrue(v2 == ray.get(k2)) - - ray.internal.free([k1, k2]) - with pytest.raises(Exception): - ray.get(k1) - with pytest.raises(Exception): - ray.get(k2) - - # Should fail silently. - ray.internal.free([k1, k2]) - - # Test actors in LOCAL_MODE. - - @ray.remote - class LocalModeTestClass: - def __init__(self, array): - self.array = array - - def set_array(self, array): - self.array = array - - def get_array(self): - return self.array - - def modify_and_set_array(self, array): - array[0] = -1 - self.array = array - - @ray.method(num_return_vals=3) - def returns_multiple(self): - return 1, 2, 3 - - test_actor = LocalModeTestClass.remote(np.arange(10)) - obj = test_actor.get_array.remote() - assert isinstance(obj, ray.ObjectID) - assert np.alltrue(ray.get(obj) == np.arange(10)) - - test_array = np.arange(10) - # Remote actor functions should not mutate arguments - test_actor.modify_and_set_array.remote(test_array) - assert np.alltrue(test_array == np.arange(10)) - # Remote actor functions should keep state - test_array[0] = -1 - assert np.alltrue(test_array == ray.get(test_actor.get_array.remote())) - - # Check that actor handles work in local mode. - - @ray.remote - def use_actor_handle(handle): - array = np.ones(10) - handle.set_array.remote(array) - assert np.alltrue(array == ray.get(handle.get_array.remote())) - - ray.get(use_actor_handle.remote(test_actor)) - - # Check that exceptions are deferred until ray.get(). - - exception_str = "test_advanced remote task exception" - - @ray.remote - def throws(): - raise Exception(exception_str) - - obj = throws.remote() - with pytest.raises(Exception, match=exception_str): - ray.get(obj) - - # Check that multiple return values are handled properly. - - @ray.remote(num_return_vals=3) - def returns_multiple(): - return 1, 2, 3 - - obj1, obj2, obj3 = returns_multiple.remote() - assert ray.get(obj1) == 1 - assert ray.get(obj2) == 2 - assert ray.get(obj3) == 3 - assert ray.get([obj1, obj2, obj3]) == [1, 2, 3] - - obj1, obj2, obj3 = test_actor.returns_multiple.remote() - assert ray.get(obj1) == 1 - assert ray.get(obj2) == 2 - assert ray.get(obj3) == 3 - assert ray.get([obj1, obj2, obj3]) == [1, 2, 3] - - @ray.remote(num_return_vals=2) - def returns_multiple_throws(): - raise Exception(exception_str) - - obj1, obj2 = returns_multiple_throws.remote() - with pytest.raises(Exception, match=exception_str): - ray.get(obj) - ray.get(obj1) - with pytest.raises(Exception, match=exception_str): - ray.get(obj2) - - # Check that Actors are not overwritten by remote calls from different - # classes. - @ray.remote - class RemoteActor1: - def __init__(self): - pass - - def function1(self): - return 0 - - @ray.remote - class RemoteActor2: - def __init__(self): - pass - - def function2(self): - return 1 - - actor1 = RemoteActor1.remote() - _ = RemoteActor2.remote() - assert ray.get(actor1.function1.remote()) == 0 - - # Test passing ObjectIDs. - @ray.remote - def direct_dep(input): - return input - - @ray.remote - def indirect_dep(input): - return ray.get(direct_dep.remote(input[0])) - - assert ray.get(indirect_dep.remote(["hello"])) == "hello" - - def test_wait_makes_object_local(ray_start_cluster): cluster = ray_start_cluster cluster.add_node(num_cpus=0) diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index a424fc3e3..1366ddae9 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -70,6 +70,134 @@ def test_omp_threads_set(shutdown_only): assert os.environ["OMP_NUM_THREADS"] == "1" +def test_submit_api(shutdown_only): + ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) + + @ray.remote + def f(n): + return list(range(n)) + + @ray.remote + def g(): + return ray.get_gpu_ids() + + assert f._remote([0], num_return_vals=0) is None + id1 = f._remote(args=[1], num_return_vals=1) + assert ray.get(id1) == [0] + id1, id2 = f._remote(args=[2], num_return_vals=2) + assert ray.get([id1, id2]) == [0, 1] + id1, id2, id3 = f._remote(args=[3], num_return_vals=3) + assert ray.get([id1, id2, id3]) == [0, 1, 2] + assert ray.get( + g._remote(args=[], num_cpus=1, num_gpus=1, + resources={"Custom": 1})) == [0] + infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1}) + assert ray.get(g._remote()) == [] + ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05) + assert len(ready_ids) == 0 + assert len(remaining_ids) == 1 + + @ray.remote + class Actor: + def __init__(self, x, y=0): + self.x = x + self.y = y + + def method(self, a, b=0): + return self.x, self.y, a, b + + def gpu_ids(self): + return ray.get_gpu_ids() + + @ray.remote + class Actor2: + def __init__(self): + pass + + def method(self): + pass + + a = Actor._remote( + args=[0], kwargs={"y": 1}, num_gpus=1, resources={"Custom": 1}) + + a2 = Actor2._remote() + ray.get(a2.method._remote()) + + id1, id2, id3, id4 = a.method._remote( + args=["test"], kwargs={"b": 2}, num_return_vals=4) + assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] + + +def test_many_fractional_resources(shutdown_only): + ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) + + @ray.remote + def g(): + return 1 + + @ray.remote + def f(block, accepted_resources): + true_resources = { + resource: value[0][1] + for resource, value in ray.get_resource_ids().items() + } + if block: + ray.get(g.remote()) + return true_resources == accepted_resources + + # Check that the resource are assigned correctly. + result_ids = [] + for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)): + resource_set = {"CPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_cpus=rand1)) + + resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000} + result_ids.append(f._remote([False, resource_set], num_gpus=rand1)) + + resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000} + result_ids.append( + f._remote([False, resource_set], resources={"Custom": rand1})) + + resource_set = { + "CPU": int(rand1 * 10000) / 10000, + "GPU": int(rand2 * 10000) / 10000, + "Custom": int(rand3 * 10000) / 10000 + } + result_ids.append( + f._remote( + [False, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + result_ids.append( + f._remote( + [True, resource_set], + num_cpus=rand1, + num_gpus=rand2, + resources={"Custom": rand3})) + assert all(ray.get(result_ids)) + + # Check that the available resources at the end are the same as the + # beginning. + stop_time = time.time() + 10 + correct_available_resources = False + while time.time() < stop_time: + if (ray.available_resources()["CPU"] == 2.0 + and ray.available_resources()["GPU"] == 2.0 + and ray.available_resources()["Custom"] == 2.0): + correct_available_resources = True + break + if not correct_available_resources: + assert False, "Did not get correct available resources." + + +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_simple_serialization(ray_start_regular): primitive_objects = [ # Various primitive types. @@ -191,6 +319,13 @@ def test_fair_queueing(shutdown_only): assert len(ready) == 1000, len(ready) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_complex_serialization(ray_start_regular): def assert_equal(obj1, obj2): module_numpy = (type(obj1).__module__ == np.__name__ @@ -455,6 +590,13 @@ def test_function_descriptor(): assert d.get(python_descriptor2) == 123 +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_nested_functions(ray_start_regular): # Make sure that remote functions can use other values that are defined # after the remote function but before the first function invocation. @@ -504,6 +646,13 @@ def test_nested_functions(ray_start_regular): assert ray.get(factorial_odd.remote(5)) == 120 +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_ray_recursive_objects(ray_start_regular): class ClassA: pass @@ -530,6 +679,13 @@ def test_ray_recursive_objects(ray_start_regular): ray.put(obj) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_reducer_override_no_reference_cycle(ray_start_regular): # bpo-39492: reducer_override used to induce a spurious reference cycle # inside the Pickler object, that could prevent all serialized objects @@ -566,6 +722,13 @@ def test_reducer_override_no_reference_cycle(ray_start_regular): assert new_obj() is None +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_deserialized_from_buffer_immutable(ray_start_regular): x = np.full((2, 2), 1.) o = ray.put(x) @@ -575,6 +738,13 @@ def test_deserialized_from_buffer_immutable(ray_start_regular): y[0, 0] = 9. +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_passing_arguments_by_value_out_of_the_box(ray_start_regular): @ray.remote def f(x): @@ -607,6 +777,13 @@ def test_passing_arguments_by_value_out_of_the_box(ray_start_regular): ray.get(ray.put(Foo)) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_putting_object_that_closes_over_object_id(ray_start_regular): # This test is here to prevent a regression of # https://github.com/ray-project/ray/issues/1317. @@ -650,6 +827,13 @@ def test_put_get(shutdown_only): assert value_before == value_after +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_custom_serializers(ray_start_regular): class Foo: def __init__(self): @@ -680,6 +864,13 @@ def test_custom_serializers(ray_start_regular): assert ray.get(f.remote()) == ((3, "string1", Bar.__name__), "string2") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_serialization_final_fallback(ray_start_regular): pytest.importorskip("catboost") # This test will only run when "catboost" is installed. @@ -840,6 +1031,13 @@ def test_register_class(ray_start_2_cpus): assert not hasattr(c2, "method1") +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_keyword_args(ray_start_regular): @ray.remote def keyword_fct1(a, b="hello"): @@ -1040,6 +1238,13 @@ def test_args_stars_after(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) +@pytest.mark.parametrize( + "shutdown_only", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_variable_number_of_args(shutdown_only): @ray.remote def varargs_fct1(*a): @@ -1085,6 +1290,13 @@ def test_variable_number_of_args(shutdown_only): ray.get(no_op.remote()) +@pytest.mark.parametrize( + "shutdown_only", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_defining_remote_functions(shutdown_only): ray.init(num_cpus=3) @@ -1133,6 +1345,13 @@ def test_defining_remote_functions(shutdown_only): assert ray.get(m.remote(1)) == 2 +@pytest.mark.parametrize( + "shutdown_only", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_redefining_remote_functions(shutdown_only): ray.init(num_cpus=1) @@ -1189,127 +1408,13 @@ def test_redefining_remote_functions(shutdown_only): assert ray.get(ray.get(h.remote(i))) == i -def test_submit_api(shutdown_only): - ray.init(num_cpus=2, num_gpus=1, resources={"Custom": 1}) - - @ray.remote - def f(n): - return list(range(n)) - - @ray.remote - def g(): - return ray.get_gpu_ids() - - assert f._remote([0], num_return_vals=0) is None - id1 = f._remote(args=[1], num_return_vals=1) - assert ray.get(id1) == [0] - id1, id2 = f._remote(args=[2], num_return_vals=2) - assert ray.get([id1, id2]) == [0, 1] - id1, id2, id3 = f._remote(args=[3], num_return_vals=3) - assert ray.get([id1, id2, id3]) == [0, 1, 2] - assert ray.get( - g._remote(args=[], num_cpus=1, num_gpus=1, - resources={"Custom": 1})) == [0] - infeasible_id = g._remote(args=[], resources={"NonexistentCustom": 1}) - assert ray.get(g._remote()) == [] - ready_ids, remaining_ids = ray.wait([infeasible_id], timeout=0.05) - assert len(ready_ids) == 0 - assert len(remaining_ids) == 1 - - @ray.remote - class Actor: - def __init__(self, x, y=0): - self.x = x - self.y = y - - def method(self, a, b=0): - return self.x, self.y, a, b - - def gpu_ids(self): - return ray.get_gpu_ids() - - @ray.remote - class Actor2: - def __init__(self): - pass - - def method(self): - pass - - a = Actor._remote( - args=[0], kwargs={"y": 1}, num_gpus=1, resources={"Custom": 1}) - - a2 = Actor2._remote() - ray.get(a2.method._remote()) - - id1, id2, id3, id4 = a.method._remote( - args=["test"], kwargs={"b": 2}, num_return_vals=4) - assert ray.get([id1, id2, id3, id4]) == [0, 1, "test", 2] - - -def test_many_fractional_resources(shutdown_only): - ray.init(num_cpus=2, num_gpus=2, resources={"Custom": 2}) - - @ray.remote - def g(): - return 1 - - @ray.remote - def f(block, accepted_resources): - true_resources = { - resource: value[0][1] - for resource, value in ray.get_resource_ids().items() - } - if block: - ray.get(g.remote()) - return true_resources == accepted_resources - - # Check that the resource are assigned correctly. - result_ids = [] - for rand1, rand2, rand3 in np.random.uniform(size=(100, 3)): - resource_set = {"CPU": int(rand1 * 10000) / 10000} - result_ids.append(f._remote([False, resource_set], num_cpus=rand1)) - - resource_set = {"CPU": 1, "GPU": int(rand1 * 10000) / 10000} - result_ids.append(f._remote([False, resource_set], num_gpus=rand1)) - - resource_set = {"CPU": 1, "Custom": int(rand1 * 10000) / 10000} - result_ids.append( - f._remote([False, resource_set], resources={"Custom": rand1})) - - resource_set = { - "CPU": int(rand1 * 10000) / 10000, - "GPU": int(rand2 * 10000) / 10000, - "Custom": int(rand3 * 10000) / 10000 - } - result_ids.append( - f._remote( - [False, resource_set], - num_cpus=rand1, - num_gpus=rand2, - resources={"Custom": rand3})) - result_ids.append( - f._remote( - [True, resource_set], - num_cpus=rand1, - num_gpus=rand2, - resources={"Custom": rand3})) - assert all(ray.get(result_ids)) - - # Check that the available resources at the end are the same as the - # beginning. - stop_time = time.time() + 10 - correct_available_resources = False - while time.time() < stop_time: - if (ray.available_resources()["CPU"] == 2.0 - and ray.available_resources()["GPU"] == 2.0 - and ray.available_resources()["Custom"] == 2.0): - correct_available_resources = True - break - if not correct_available_resources: - assert False, "Did not get correct available resources." - - +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_get_multiple(ray_start_regular): object_ids = [ray.put(i) for i in range(10)] assert ray.get(object_ids) == list(range(10)) @@ -1321,6 +1426,13 @@ def test_get_multiple(ray_start_regular): assert results == indices +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_get_multiple_experimental(ray_start_regular): object_ids = [ray.put(i) for i in range(10)] @@ -1331,6 +1443,13 @@ def test_get_multiple_experimental(ray_start_regular): assert ray.experimental.get(object_ids_nparray) == list(range(10)) +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) def test_get_dict(ray_start_regular): d = {str(i): ray.put(i) for i in range(5)} for i in range(5, 10): @@ -1361,6 +1480,13 @@ def test_get_with_timeout(ray_start_regular): assert time.time() - start < 30 +@pytest.mark.parametrize( + "ray_start_regular", [{ + "local_mode": True + }, { + "local_mode": False + }], + indirect=True) # https://github.com/ray-project/ray/issues/6329 def test_call_actors_indirect_through_tasks(ray_start_regular): @ray.remote diff --git a/python/ray/worker.py b/python/ray/worker.py index afc1e4bd5..7bea64849 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -15,7 +15,6 @@ import sys import threading import time import traceback -import random # Ray modules import ray.cloudpickle as pickle @@ -54,7 +53,6 @@ from ray.utils import ( is_cython, setup_logger, ) -from ray.local_mode_manager import LocalModeManager SCRIPT_MODE = 0 WORKER_MODE = 1 @@ -264,6 +262,10 @@ class Worker: "do this, you can wrap the ray.ObjectID in a list and " "call 'put' on it (or return it).") + if self.mode == LOCAL_MODE: + assert object_id is None, ("Local Mode does not support " + "inserting with an objectID") + serialized_value = self.get_serialization_context().serialize(value) # This *must* be the first place that we construct this python # ObjectID because an entry with 0 local references is created when @@ -291,10 +293,6 @@ class Worker: whose values should be retrieved. timeout (float): timeout (float): The maximum amount of time in seconds to wait before returning. - - Raises: - Exception if running in LOCAL_MODE and any of the object IDs do not - exist in the emulated object store. """ # Make sure that the values are object IDs. for object_id in object_ids: @@ -303,9 +301,6 @@ class Worker: "Attempting to call `get` on the value {}, " "which is not an ray.ObjectID.".format(object_id)) - if self.mode == LOCAL_MODE: - return self.local_mode_manager.get_objects(object_ids) - timeout_ms = int(timeout * 1000) if timeout else -1 data_metadata_pairs = self.core_worker.get_objects( object_ids, self.current_task_id, timeout_ms) @@ -438,9 +433,11 @@ def get_gpu_ids(): Returns: A list of GPU IDs. """ + + # TODO(ilr) Handle inserting resources in local mode if _mode() == LOCAL_MODE: - raise RuntimeError("ray.get_gpu_ids() currently does not work in " - "local_mode.") + logger.info("ray.get_gpu_ids() currently does not work in LOCAL " + "MODE.") all_resource_ids = global_worker.core_worker.resource_ids() assigned_ids = [ @@ -600,8 +597,8 @@ def init(address=None, same driver in order to generate the object IDs in a consistent manner. However, the same ID should not be used for different drivers. - local_mode (bool): True if the code should be executed serially - without Ray. This is useful for debugging. + local_mode (bool): True if the code should be executed serially. This + is useful for debugging. driver_object_store_memory (int): Limit the amount of memory the driver can use in the object store for creating objects. By default, this is autoset based on available system memory, subject to a 20GB cap. @@ -705,17 +702,13 @@ def init(address=None, _internal_config["free_objects_period_milliseconds"] = 1000 global _global_node - if driver_mode == LOCAL_MODE: - # If starting Ray in LOCAL_MODE, don't start any other processes. - _global_node = ray.node.LocalNode() - elif redis_address is None: + if redis_address is None: # In this case, we need to start a new cluster. ray_params = ray.parameter.RayParams( redis_address=redis_address, redis_port=redis_port, node_ip_address=node_ip_address, object_id_seed=object_id_seed, - local_mode=local_mode, driver_mode=driver_mode, redirect_worker_output=redirect_worker_output, redirect_output=redirect_output, @@ -1122,12 +1115,11 @@ def connect(node, ray._raylet.set_internal_config(internal_config) - if mode is not LOCAL_MODE: - # Create a Redis client to primary. - # The Redis client can safely be shared between threads. However, - # that is not true of Redis pubsub clients. See the documentation at - # https://github.com/andymccurdy/redis-py#thread-safety. - worker.redis_client = node.create_redis_client() + # Create a Redis client to primary. + # The Redis client can safely be shared between threads. However, + # that is not true of Redis pubsub clients. See the documentation at + # https://github.com/andymccurdy/redis-py#thread-safety. + worker.redis_client = node.create_redis_client() # Initialize some fields. if mode is WORKER_MODE: @@ -1136,12 +1128,6 @@ def connect(node, job_id = JobID.nil() # TODO(qwang): Rename this to `worker_id_str` or type to `WorkerID` worker.worker_id = _random_string() - setproctitle.setproctitle("ray::IDLE") - elif mode is LOCAL_MODE: - if job_id is None: - job_id = JobID.from_int(random.randint(1, 65535)) - worker.worker_id = ray.utils.compute_driver_id_from_job( - job_id).binary() else: # This is the code path of driver mode. if job_id is None: @@ -1155,6 +1141,9 @@ def connect(node, worker.worker_id = ray.utils.compute_driver_id_from_job( job_id).binary() + if mode is not SCRIPT_MODE and setproctitle: + setproctitle.setproctitle("ray::IDLE") + if not isinstance(job_id, JobID): raise TypeError("The type of given job id must be JobID.") @@ -1163,12 +1152,6 @@ def connect(node, worker.node = node worker.set_mode(mode) - # If running Ray in LOCAL_MODE, there is no need to create call - # create_worker or to start the worker service. - if mode == LOCAL_MODE: - worker.local_mode_manager = LocalModeManager() - return - # For driver's check that the version information matches the version # information that the Ray cluster was started with. try: @@ -1249,9 +1232,9 @@ def connect(node, (log_stderr_file if log_stderr_file is not None else sys.stderr).name) worker.redis_client.hmset(b"Workers:" + worker.worker_id, worker_dict) - else: - raise ValueError("Invalid worker mode. Expected DRIVER or WORKER.") - + elif not LOCAL_MODE: + raise ValueError( + "Invalid worker mode. Expected DRIVER, WORKER or LOCAL.") redis_address, redis_port = node.redis_address.split(":") gcs_options = ray._raylet.GcsClientOptions( redis_address, @@ -1259,15 +1242,9 @@ def connect(node, node.redis_password, ) worker.core_worker = ray._raylet.CoreWorker( - (mode == SCRIPT_MODE), - node.plasma_store_socket_name, - node.raylet_socket_name, - job_id, - gcs_options, - node.get_logs_dir_path(), - node.node_ip_address, - node.node_manager_port, - ) + (mode == SCRIPT_MODE), node.plasma_store_socket_name, + node.raylet_socket_name, job_id, gcs_options, node.get_logs_dir_path(), + node.node_ip_address, node.node_manager_port, mode == LOCAL_MODE) if driver_object_store_memory is not None: worker.core_worker.set_object_store_client_options( @@ -1276,9 +1253,10 @@ def connect(node, # Put something in the plasma store so that subsequent plasma store # accesses will be faster. Currently the first access is always slow, and # we don't want the user to experience this. - temporary_object_id = ray.ObjectID.from_random() - worker.put_object(1, object_id=temporary_object_id) - ray.internal.free([temporary_object_id]) + if mode != LOCAL_MODE: + temporary_object_id = ray.ObjectID.from_random() + worker.put_object(1, object_id=temporary_object_id) + ray.internal.free([temporary_object_id]) # Start the import thread worker.import_thread = import_thread.ImportThread(worker, mode, @@ -1540,16 +1518,13 @@ def put(value, weakref=False): worker = global_worker worker.check_connected() with profiling.profile("ray.put"): - if worker.mode == LOCAL_MODE: - object_id = worker.local_mode_manager.put_object(value) - else: - try: - object_id = worker.put_object(value, pin_object=not weakref) - except ObjectStoreFullError: - logger.info( - "Put failed since the value was either too large or the " - "store was full of pinned objects.") - raise + try: + object_id = worker.put_object(value, pin_object=not weakref) + except ObjectStoreFullError: + logger.info( + "Put failed since the value was either too large or the " + "store was full of pinned objects.") + raise return object_id @@ -1624,10 +1599,6 @@ def wait(object_ids, num_returns=1, timeout=None): worker.check_connected() # TODO(swang): Check main thread. with profiling.profile("ray.wait"): - # When Ray is run in LOCAL_MODE, all functions are run immediately, - # so all objects in object_id are ready. - if worker.mode == LOCAL_MODE: - return object_ids[:num_returns], object_ids[num_returns:] # TODO(rkn): This is a temporary workaround for # https://github.com/ray-project/ray/issues/997. However, it should be diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index de13080ea..783cff3dd 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -84,11 +84,12 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, std::function check_signals, std::function gc_collect, std::function get_lang_stack, - bool ref_counting_enabled) + bool ref_counting_enabled, bool local_mode) : worker_type_(worker_type), language_(language), log_dir_(log_dir), ref_counting_enabled_(ref_counting_enabled), + is_local_mode_(local_mode), check_signals_(check_signals), gc_collect_(gc_collect), get_call_site_(RayConfig::instance().record_ref_creation_sites() ? get_lang_stack @@ -129,7 +130,7 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, io_service_, gcs_client_); // Initialize task receivers. - if (worker_type_ == WorkerType::WORKER) { + if (worker_type_ == WorkerType::WORKER || is_local_mode_) { RAY_CHECK(task_execution_callback_ != nullptr); auto execute_task = std::bind(&CoreWorker::ExecuteTask, this, std::placeholders::_1, @@ -230,10 +231,11 @@ CoreWorker::CoreWorker(const WorkerType worker_type, const Language language, std::shared_ptr data = std::make_shared(); data->mutable_task()->mutable_task_spec()->CopyFrom(builder.Build().GetMessage()); - RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr)); + if (!is_local_mode_) { + RAY_CHECK_OK(gcs_client_->Tasks().AsyncAdd(data, nullptr)); + } SetCurrentTaskId(task_id); } - auto client_factory = [this](const rpc::Address &addr) { return std::shared_ptr( new rpc::CoreWorkerClient(addr, *client_call_manager_)); @@ -443,7 +445,7 @@ void CoreWorker::RegisterOwnershipInfoAndResolveFuture( reference_counter_->AddBorrowedObject(object_id, outer_object_id, owner_id, owner_address); - RAY_CHECK(!owner_id.IsNil()); + RAY_CHECK(!owner_id.IsNil() || is_local_mode_); // We will ask the owner about the object until the object is // created or we can no longer reach the owner. future_resolver_->ResolveFutureAsync(object_id, owner_id, owner_address); @@ -469,6 +471,10 @@ Status CoreWorker::Put(const RayObject &object, const std::vector &contained_object_ids, const ObjectID &object_id, bool pin_object) { bool object_exists; + if (is_local_mode_) { + RAY_CHECK(memory_store_->Put(object, object_id)); + return Status::OK(); + } RAY_RETURN_NOT_OK(plasma_store_provider_->Put(object, object_id, &object_exists)); if (!object_exists) { if (pin_object) { @@ -498,8 +504,14 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t *object_id = ObjectID::ForPut(worker_context_.GetCurrentTaskID(), worker_context_.GetNextPutIndex(), static_cast(TaskTransportType::DIRECT)); - RAY_RETURN_NOT_OK( - plasma_store_provider_->Create(metadata, data_size, *object_id, data)); + + if (is_local_mode_) { + *data = std::make_shared(data_size); + } else { + RAY_RETURN_NOT_OK( + plasma_store_provider_->Create(metadata, data_size, *object_id, data)); + } + // Only add the object to the reference counter if it didn't already exist. if (data) { reference_counter_->AddOwnedObject(*object_id, contained_object_ids, GetCallerId(), @@ -511,7 +523,12 @@ Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t Status CoreWorker::Create(const std::shared_ptr &metadata, const size_t data_size, const ObjectID &object_id, std::shared_ptr *data) { - return plasma_store_provider_->Create(metadata, data_size, object_id, data); + if (is_local_mode_) { + return Status::NotImplemented( + "Creating an object with a pre-existing ObjectID is not supported in local mode"); + } else { + return plasma_store_provider_->Create(metadata, data_size, object_id, data); + } } Status CoreWorker::Seal(const ObjectID &object_id, bool pin_object, @@ -774,6 +791,11 @@ TaskID CoreWorker::GetCallerId() const { Status CoreWorker::PushError(const JobID &job_id, const std::string &type, const std::string &error_message, double timestamp) { + if (is_local_mode_) { + RAY_LOG(ERROR) << "Pushed Error with JobID: " << job_id << " of type: " << type + << " with message: " << error_message << " at time: " << timestamp; + return Status::OK(); + } return local_raylet_client_->PushError(job_id, type, error_message, timestamp); } @@ -809,9 +831,13 @@ Status CoreWorker::SubmitTask(const RayFunction &function, rpc_address_, function, args, task_options.num_returns, task_options.resources, required_resources, return_ids); TaskSpecification task_spec = builder.Build(); - task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), - max_retries); - return direct_task_submitter_->SubmitTask(task_spec); + if (is_local_mode_) { + return ExecuteTaskLocalMode(task_spec); + } else { + task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, + CurrentCallSite(), max_retries); + return direct_task_submitter_->SubmitTask(task_spec); + } } Status CoreWorker::CreateActor(const RayFunction &function, @@ -839,12 +865,16 @@ Status CoreWorker::CreateActor(const RayFunction &function, *return_actor_id = actor_id; TaskSpecification task_spec = builder.Build(); - task_manager_->AddPendingTask( - GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), - std::max(RayConfig::instance().actor_creation_min_retries(), - actor_creation_options.max_reconstructions)); - Status status = direct_task_submitter_->SubmitTask(task_spec); - + Status status; + if (is_local_mode_) { + status = ExecuteTaskLocalMode(task_spec); + } else { + task_manager_->AddPendingTask( + GetCallerId(), rpc_address_, task_spec, CurrentCallSite(), + std::max(RayConfig::instance().actor_creation_min_retries(), + actor_creation_options.max_reconstructions)); + status = direct_task_submitter_->SubmitTask(task_spec); + } std::unique_ptr actor_handle(new ActorHandle( actor_id, GetCallerId(), rpc_address_, job_id, /*actor_cursor=*/return_ids[0], function.GetLanguage(), function.GetFunctionDescriptor(), extension_data)); @@ -884,6 +914,9 @@ Status CoreWorker::SubmitActorTask(const ActorID &actor_id, const RayFunction &f // Submit task. Status status; TaskSpecification task_spec = builder.Build(); + if (is_local_mode_) { + return ExecuteTaskLocalMode(task_spec, actor_id); + } task_manager_->AddPendingTask(GetCallerId(), rpc_address_, task_spec, CurrentCallSite()); if (actor_handle->IsDead()) { @@ -1033,7 +1066,9 @@ Status CoreWorker::AllocateReturnObjects( RAY_CHECK(object_ids.size() == data_sizes.size()); return_objects->resize(object_ids.size(), nullptr); - rpc::Address owner_address(worker_context_.GetCurrentTask()->CallerAddress()); + rpc::Address owner_address(is_local_mode_ + ? rpc::Address() + : worker_context_.GetCurrentTask()->CallerAddress()); for (size_t i = 0; i < object_ids.size(); i++) { bool object_already_exists = false; @@ -1048,8 +1083,8 @@ Status CoreWorker::AllocateReturnObjects( } // Allocate a buffer for the return object. - if (static_cast(data_sizes[i]) < - RayConfig::instance().max_direct_call_object_size()) { + if (is_local_mode_ || static_cast(data_sizes[i]) < + RayConfig::instance().max_direct_call_object_size()) { data_buffer = std::make_shared(data_sizes[i]); } else { RAY_RETURN_NOT_OK( @@ -1071,16 +1106,17 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, const std::shared_ptr &resource_ids, std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs) { - RAY_LOG(DEBUG) << "Executing task " << task_spec.TaskId(); task_queue_length_ -= 1; num_executed_tasks_ += 1; if (resource_ids != nullptr) { resource_ids_ = resource_ids; } - worker_context_.SetCurrentTask(task_spec); - SetCurrentTaskId(task_spec.TaskId()); + if (!is_local_mode_) { + worker_context_.SetCurrentTask(task_spec); + SetCurrentTaskId(task_spec.TaskId()); + } { absl::MutexLock lock(&mutex_); current_task_ = task_spec; @@ -1131,8 +1167,8 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, arg_reference_ids, return_ids, return_objects, worker_context_.GetWorkerID()); absl::optional caller_address( - worker_context_.GetCurrentTask()->CallerAddress()); - + is_local_mode_ ? absl::optional() + : worker_context_.GetCurrentTask()->CallerAddress()); for (size_t i = 0; i < return_objects->size(); i++) { // The object is nullptr if it already existed in the object store. if (!return_objects->at(i)) { @@ -1168,13 +1204,16 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, RAY_LOG(DEBUG) << "There were " << reference_counter_->NumObjectIDsInScope() << " ObjectIDs left in scope after executing task " << task_spec.TaskId() - << ". This is either caused by keeping references to ObjectIDs in Python between " + << ". This is either caused by keeping references to ObjectIDs in Python " + "between " "tasks (e.g., in global variables) or indicates a problem with Ray's " "reference counting, and may cause problems in the object store."; } - SetCurrentTaskId(TaskID::Nil()); - worker_context_.ResetCurrentTask(task_spec); + if (!is_local_mode_) { + SetCurrentTaskId(TaskID::Nil()); + worker_context_.ResetCurrentTask(task_spec); + } { absl::MutexLock lock(&mutex_); current_task_ = TaskSpecification(); @@ -1188,6 +1227,24 @@ Status CoreWorker::ExecuteTask(const TaskSpecification &task_spec, return status; } +Status CoreWorker::ExecuteTaskLocalMode(const TaskSpecification &task_spec, + const ActorID &actor_id) { + auto resource_ids = std::make_shared(); + auto return_objects = std::vector>(); + auto borrowed_refs = ReferenceCounter::ReferenceTableProto(); + for (size_t i = 0; i < task_spec.NumReturns(); i++) { + reference_counter_->AddOwnedObject(task_spec.ReturnId(i, TaskTransportType::DIRECT), + /*inner_ids=*/{}, GetCallerId(), rpc_address_, + CurrentCallSite(), -1); + } + auto old_id = GetActorId(); + SetActorId(actor_id); + auto status = ExecuteTask(task_spec, resource_ids, &return_objects, &borrowed_refs); + SetActorId(old_id); + // TODO(ilr): Maybe not necessary + return status; +} + Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, std::vector> *args, std::vector *arg_reference_ids, @@ -1206,7 +1263,7 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, // Direct call type objects that weren't inlined have been promoted to plasma. // We need to put an OBJECT_IN_PLASMA error here so the subsequent call to Get() // properly redirects to the plasma store. - if (task.ArgId(i, 0).IsDirectCallType()) { + if (task.ArgId(i, 0).IsDirectCallType() && !is_local_mode_) { RAY_UNUSED(memory_store_->Put(RayObject(rpc::ErrorType::OBJECT_IN_PLASMA), task.ArgId(i, 0))); } @@ -1252,8 +1309,13 @@ Status CoreWorker::BuildArgsForExecutor(const TaskSpecification &task, // Fetch by-reference arguments directly from the plasma store. bool got_exception = false; absl::flat_hash_map> result_map; - RAY_RETURN_NOT_OK(plasma_store_provider_->Get(by_ref_ids, -1, worker_context_, - &result_map, &got_exception)); + if (is_local_mode_) { + RAY_RETURN_NOT_OK( + memory_store_->Get(by_ref_ids, -1, worker_context_, &result_map, &got_exception)); + } else { + RAY_RETURN_NOT_OK(plasma_store_provider_->Get(by_ref_ids, -1, worker_context_, + &result_map, &got_exception)); + } for (const auto &it : result_map) { for (size_t idx : by_ref_indices[it.first]) { args->at(idx) = it.second; @@ -1511,7 +1573,9 @@ void CoreWorker::HandlePlasmaObjectReady(const rpc::PlasmaObjectReadyRequest &re void CoreWorker::SetActorId(const ActorID &actor_id) { absl::MutexLock lock(&mutex_); - RAY_CHECK(actor_id_.IsNil()); + if (!is_local_mode_) { + RAY_CHECK(actor_id_.IsNil()); + } actor_id_ = actor_id; } diff --git a/src/ray/core_worker/core_worker.h b/src/ray/core_worker/core_worker.h index c419dc4db..61d286060 100644 --- a/src/ray/core_worker/core_worker.h +++ b/src/ray/core_worker/core_worker.h @@ -92,7 +92,7 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::function check_signals = nullptr, std::function gc_collect = nullptr, std::function get_lang_stack = nullptr, - bool ref_counting_enabled = false); + bool ref_counting_enabled = false, bool local_mode = false); virtual ~CoreWorker(); @@ -138,7 +138,8 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { void RemoveLocalReference(const ObjectID &object_id) { std::vector deleted; reference_counter_->RemoveLocalReference(object_id, &deleted); - if (ref_counting_enabled_) { + // TOOD(ilr): better way of keeping an object from being deleted + if (ref_counting_enabled_ && !is_local_mode_) { memory_store_->Delete(deleted); } } @@ -629,6 +630,13 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { std::vector> *return_objects, ReferenceCounter::ReferenceTableProto *borrowed_refs); + /// Execute a local mode task (runs normal ExecuteTask) + /// + /// \param spec[in] task_spec Task specification. + /// \return Status. + Status ExecuteTaskLocalMode(const TaskSpecification &task_spec, + const ActorID &actor_id = ActorID::Nil()); + /// Build arguments for task executor. This would loop through all the arguments /// in task spec, and for each of them that's passed by reference (ObjectID), /// fetch its content from store and; for arguments that are passed by value, @@ -686,6 +694,9 @@ class CoreWorker : public rpc::CoreWorkerServiceHandler { /// Whether local reference counting is enabled. const bool ref_counting_enabled_; + /// Is local mode being used. + const bool is_local_mode_; + /// Application-language callback to check for signals that have been received /// since calling into C++. This will be called periodically (at least every /// 1s) during long-running operations.