diff --git a/.gitignore b/.gitignore index 9983fe144..0bb7eca8d 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ /python/build /python/dist /thirdparty/pkg/ +/build/java # Files generated by flatc should be ignored /src/ray/gcs/format/*_generated.h diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index c28b293b8..7c5f75b46 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -7,7 +7,7 @@ import numpy import time import logging -from libc.stdint cimport uint8_t, int32_t, int64_t +from libc.stdint cimport uint8_t, int32_t, int64_t, uint64_t from libcpp cimport bool as c_bool from libcpp.memory cimport ( dynamic_pointer_cast, @@ -34,6 +34,7 @@ from ray.includes.common cimport ( LANGUAGE_CPP, LANGUAGE_JAVA, LANGUAGE_PYTHON, + LocalMemoryBuffer, WORKER_TYPE_WORKER, WORKER_TYPE_DRIVER, ) @@ -49,9 +50,15 @@ from ray.includes.unique_ids cimport ( CObjectID, CClientID, ) -from ray.includes.libcoreworker cimport CCoreWorker, CTaskOptions +from ray.includes.libcoreworker cimport ( + CActorCreationOptions, + CCoreWorker, + CTaskOptions, +) from ray.includes.task cimport CTaskSpec from ray.includes.ray_config cimport RayConfig +import ray +from ray import profiling from ray.exceptions import RayletError, ObjectStoreFullError from ray.utils import decode from ray.ray_constants import ( @@ -246,15 +253,28 @@ cdef Language LANG_CPP = Language.from_native(LANGUAGE_CPP) cdef Language LANG_JAVA = Language.from_native(LANGUAGE_JAVA) -cdef unordered_map[c_string, double] resource_map_from_dict(resource_map): +cdef int prepare_resources( + dict resource_dict, + unordered_map[c_string, double] *resource_map) except -1: cdef: unordered_map[c_string, double] out c_string resource_name - if not isinstance(resource_map, dict): - raise TypeError("resource_map must be a dictionary") - for key, value in resource_map.items(): - out[key.encode("ascii")] = float(value) - return out + + if resource_dict is None: + raise ValueError("Must provide resource map.") + + for key, value in resource_dict.items(): + if not (isinstance(value, int) or isinstance(value, float)): + raise ValueError("Resource quantities may only be ints or floats.") + if value < 0: + raise ValueError("Resource quantities may not be negative.") + if value > 0: + if (value >= 1 and isinstance(value, float) + and not value.is_integer()): + raise ValueError( + "Resource quantities >1 must be whole numbers.") + resource_map[0][key.encode("ascii")] = float(value) + return 0 cdef c_vector[c_string] string_vector_from_list(list string_list): @@ -267,6 +287,33 @@ cdef c_vector[c_string] string_vector_from_list(list string_list): return out +cdef void prepare_args(list args, c_vector[CTaskArg] *args_vector): + cdef: + c_string pickled_str + shared_ptr[CBuffer] arg_data + shared_ptr[CBuffer] arg_metadata + + for arg in args: + if isinstance(arg, ObjectID): + args_vector.push_back( + CTaskArg.PassByReference((arg).native())) + elif not ray._raylet.check_simple_value(arg): + args_vector.push_back( + CTaskArg.PassByReference((ray.put(arg)).native())) + else: + pickled_str = pickle.dumps( + arg, protocol=pickle.HIGHEST_PROTOCOL) + # TODO(edoakes): This makes a copy that could be avoided. + arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( + make_shared[LocalMemoryBuffer]( + (pickled_str.data()), + pickled_str.size(), + True)) + args_vector.push_back( + CTaskArg.PassByValue( + make_shared[CRayObject](arg_data, arg_metadata))) + + cdef class RayletClient: cdef CRayletClient* client @@ -280,13 +327,6 @@ cdef class RayletClient: # initialized before the raylet client. self.client = &core_worker.core_worker.get().GetRayletClient() - def submit_task(self, TaskSpec task_spec): - cdef: - CObjectID c_id - - check_status(self.client.SubmitTask( - task_spec.task_spec.get()[0])) - def get_task(self): cdef: unique_ptr[CTaskSpec] task_spec @@ -385,6 +425,23 @@ cdef class CoreWorker: with nogil: self.core_worker.get().Disconnect() + def set_current_task_id(self, TaskID task_id): + cdef: + CTaskID c_task_id = task_id.native() + + with nogil: + self.core_worker.get().SetCurrentTaskId(c_task_id) + + def get_current_task_id(self): + return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) + + def set_current_job_id(self, JobID job_id): + cdef: + CJobID c_job_id = job_id.native() + + with nogil: + self.core_worker.get().SetCurrentJobId(c_job_id) + def get_objects(self, object_ids, TaskID current_task_id): cdef: c_vector[shared_ptr[CRayObject]] results @@ -539,65 +596,6 @@ cdef class CoreWorker: check_status(self.core_worker.get().Objects().Delete( free_ids, local_only, delete_creating_tasks)) - def get_current_task_id(self): - return TaskID(self.core_worker.get().GetCurrentTaskId().Binary()) - - def set_current_task_id(self, TaskID task_id): - cdef: - CTaskID c_task_id = task_id.native() - - with nogil: - self.core_worker.get().SetCurrentTaskId(c_task_id) - - def set_current_job_id(self, JobID job_id): - cdef: - CJobID c_job_id = job_id.native() - - with nogil: - self.core_worker.get().SetCurrentJobId(c_job_id) - - def submit_task(self, - function_descriptor, - args, - int num_return_vals, - resources): - cdef: - unordered_map[c_string, double] c_resources - CTaskOptions task_options - CRayFunction ray_function - c_vector[CTaskArg] args_vector - c_vector[CObjectID] return_ids - c_string pickled_str - shared_ptr[CBuffer] arg_data - shared_ptr[CBuffer] arg_metadata - - c_resources = resource_map_from_dict(resources) - task_options = CTaskOptions(num_return_vals, c_resources) - ray_function = CRayFunction( - LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) - - for arg in args: - if isinstance(arg, ObjectID): - args_vector.push_back( - CTaskArg.PassByReference((arg).native())) - else: - pickled_str = pickle.dumps( - arg, protocol=pickle.HIGHEST_PROTOCOL) - arg_data = dynamic_pointer_cast[CBuffer, LocalMemoryBuffer]( - make_shared[LocalMemoryBuffer]( - (pickled_str.data()), - pickled_str.size(), - True)) - args_vector.push_back( - CTaskArg.PassByValue( - make_shared[CRayObject](arg_data, arg_metadata))) - - with nogil: - check_status(self.core_worker.get().Tasks().SubmitTask( - ray_function, args_vector, task_options, &return_ids)) - - return VectorToObjectIDs(return_ids) - def set_object_store_client_options(self, c_string client_name, int64_t limit_bytes): with nogil: @@ -613,6 +611,90 @@ cdef class CoreWorker: return message.decode("utf-8") + def submit_task(self, + function_descriptor, + args, + int num_return_vals, + resources): + cdef: + unordered_map[c_string, double] c_resources + CTaskOptions task_options + CRayFunction ray_function + c_vector[CTaskArg] args_vector + c_vector[CObjectID] return_ids + + with profiling.profile("submit_task"): + prepare_resources(resources, &c_resources) + task_options = CTaskOptions(num_return_vals, c_resources) + ray_function = CRayFunction( + LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) + prepare_args(args, &args_vector) + + with nogil: + check_status(self.core_worker.get().Tasks().SubmitTask( + ray_function, args_vector, task_options, &return_ids)) + + return VectorToObjectIDs(return_ids) + + def create_actor(self, + function_descriptor, + args, + uint64_t max_reconstructions, + resources, + placement_resources): + cdef: + ActorHandle actor_handle = ActorHandle.__new__(ActorHandle) + CRayFunction ray_function + c_vector[CTaskArg] args_vector + c_vector[c_string] dynamic_worker_options + unordered_map[c_string, double] c_resources + unordered_map[c_string, double] c_placement_resources + + with profiling.profile("submit_task"): + prepare_resources(resources, &c_resources) + prepare_resources(placement_resources, &c_placement_resources) + ray_function = CRayFunction( + LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) + prepare_args(args, &args_vector) + + with nogil: + check_status(self.core_worker.get().Tasks().CreateActor( + ray_function, args_vector, + CActorCreationOptions( + max_reconstructions, False, c_resources, + c_placement_resources, dynamic_worker_options), + &actor_handle.inner)) + + return actor_handle + + def submit_actor_task(self, + ActorHandle handle, + function_descriptor, + args, + int num_return_vals, + resources): + + cdef: + unordered_map[c_string, double] c_resources + CTaskOptions task_options + CRayFunction ray_function + c_vector[CTaskArg] args_vector + c_vector[CObjectID] return_ids + + with profiling.profile("submit_task"): + prepare_resources(resources, &c_resources) + task_options = CTaskOptions(num_return_vals, c_resources) + ray_function = CRayFunction( + LANGUAGE_PYTHON, string_vector_from_list(function_descriptor)) + prepare_args(args, &args_vector) + + with nogil: + check_status(self.core_worker.get().Tasks().SubmitActorTask( + handle.inner.get()[0], ray_function, + args_vector, task_options, &return_ids)) + + return VectorToObjectIDs(return_ids) + def profile_event(self, event_type, dict extra_data): cdef: c_string c_event_type = event_type.encode("ascii") diff --git a/python/ray/actor.py b/python/ray/actor.py index d367c7e11..a5523adaa 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -3,76 +3,24 @@ from __future__ import division from __future__ import print_function import copy -import hashlib import inspect import logging import six import sys -import threading from abc import ABCMeta, abstractmethod from collections import namedtuple from ray.function_manager import FunctionDescriptor import ray.ray_constants as ray_constants +import ray._raylet import ray.signature as signature import ray.worker -from ray import (ObjectID, ActorID, ActorHandleID, ActorClassID, TaskID) +from ray import ActorID, ActorHandleID, ActorClassID, profiling logger = logging.getLogger(__name__) -def compute_actor_handle_id(actor_handle_id, num_forks): - """Deterministically compute an actor handle ID. - - A new actor handle ID is generated when it is forked from another actor - handle. The new handle ID is computed as hash(old_handle_id || num_forks). - - Args: - actor_handle_id (common.ObjectID): The original actor handle ID. - num_forks: The number of times the original actor handle has been - forked so far. - - Returns: - An ID for the new actor handle. - """ - assert isinstance(actor_handle_id, ActorHandleID) - handle_id_hash = hashlib.sha1() - handle_id_hash.update(actor_handle_id.binary()) - handle_id_hash.update(str(num_forks).encode("ascii")) - handle_id = handle_id_hash.digest() - return ActorHandleID(handle_id) - - -def compute_actor_handle_id_non_forked(actor_handle_id, current_task_id): - """Deterministically compute an actor handle ID in the non-forked case. - - This code path is used whenever an actor handle is pickled and unpickled - (for example, if a remote function closes over an actor handle). Then, - whenever the actor handle is used, a new actor handle ID will be generated - on the fly as a deterministic function of the actor ID, the previous actor - handle ID and the current task ID. - - TODO(rkn): It may be possible to cause problems by closing over multiple - actor handles in a remote function, which then get unpickled and give rise - to the same actor handle IDs. - - Args: - actor_handle_id: The original actor handle ID. - current_task_id: The ID of the task that is unpickling the handle. - - Returns: - An ID for the new actor handle. - """ - assert isinstance(actor_handle_id, ActorHandleID) - assert isinstance(current_task_id, TaskID) - handle_id_hash = hashlib.sha1() - handle_id_hash.update(actor_handle_id.binary()) - handle_id_hash.update(current_task_id.binary()) - handle_id = handle_id_hash.digest() - return ActorHandleID(handle_id) - - def method(*args, **kwargs): """Annotate an actor method. @@ -359,14 +307,6 @@ class ActorClass(object): raise Exception("Actors cannot be created before ray.init() " "has been called.") - actor_id = ActorID.of(worker.current_job_id, worker.current_task_id, - worker.task_context.task_index + 1) - # The actor cursor is a dummy object representing the most recent - # actor method invocation. For each subsequent method invocation, - # the current cursor should be added as a dependency, and then - # updated to reflect the new invocation. - actor_cursor = None - # Set the actor's default resources if not already set. First three # conditions are to check that no resources were specified in the # decorator. Last three conditions are to check that no resources were @@ -386,12 +326,23 @@ class ActorClass(object): if self._num_cpus is None else self._num_cpus) actor_method_cpu = ray_constants.DEFAULT_ACTOR_METHOD_CPU_SPECIFIED + function_name = "__init__" + function_descriptor = FunctionDescriptor( + self._modified_class.__module__, function_name, + self._modified_class.__name__) + # Do not export the actor class or the actor if run in LOCAL_MODE # Instead, instantiate the actor locally and add it to the worker's # dictionary if worker.mode == ray.LOCAL_MODE: + actor_id = ActorID.of(worker.current_job_id, + worker.current_task_id, + worker.task_context.task_index + 1) worker.actors[actor_id] = self._modified_class( *copy.deepcopy(args), **copy.deepcopy(kwargs)) + core_handle = ray._raylet.ActorHandle( + actor_id, ActorHandleID.nil(), worker.current_job_id, + function_descriptor.get_function_descriptor_list()) else: # Export the actor. if (self._last_export_session_and_job != @@ -418,32 +369,25 @@ class ActorClass(object): actor_placement_resources = resources.copy() actor_placement_resources["CPU"] += 1 - function_name = "__init__" function_signature = self._method_signatures[function_name] creation_args = signature.extend_args(function_signature, args, kwargs) - function_descriptor = FunctionDescriptor( - self._modified_class.__module__, function_name, - self._modified_class.__name__) - [actor_cursor] = worker.submit_task( - function_descriptor, - creation_args, - actor_creation_id=actor_id, - max_actor_reconstructions=self._max_reconstructions, - num_return_vals=1, - resources=resources, - placement_resources=actor_placement_resources) - assert isinstance(actor_cursor, ObjectID) + core_handle = worker.core_worker.create_actor( + function_descriptor.get_function_descriptor_list(), + creation_args, self._max_reconstructions, resources, + actor_placement_resources) actor_handle = ActorHandle( - actor_id, self._modified_class.__module__, self._class_name, - actor_cursor, self._actor_method_names, self._method_decorators, - self._method_signatures, self._actor_method_num_return_vals, - actor_cursor, actor_method_cpu, worker.current_job_id, - worker.current_session_and_job) - # We increment the actor counter by 1 to account for the actor creation - # task. - actor_handle._ray_actor_counter += 1 + core_handle, + self._modified_class.__module__, + self._class_name, + self._actor_method_names, + self._method_decorators, + self._method_signatures, + self._actor_method_num_return_vals, + actor_method_cpu, + worker.current_session_and_job, + original_handle=True) return actor_handle @@ -464,23 +408,8 @@ class ActorHandle(object): cloudpickle). Attributes: - _ray_actor_id: The ID of the corresponding actor. + _ray_core_handle: Core worker actor handle for this actor. _ray_module_name: The module name of this actor. - _ray_actor_handle_id: The ID of this handle. If this is the "original" - handle for an actor (as opposed to one created by passing another - handle into a task), then this ID must be NIL_ID. If this - ActorHandle was created by forking an existing ActorHandle, then - this ID must be computed deterministically via - compute_actor_handle_id. If this ActorHandle was created by an - out-of-band mechanism (e.g., pickling), then this must be None (in - this case, a new actor handle ID will be generated on the fly every - time a method is invoked). - _ray_actor_cursor: The actor cursor is a dummy object representing the - most recent actor method invocation. For each subsequent method - invocation, the current cursor should be added as a dependency, and - then updated to reflect the new invocation. - _ray_actor_counter: The number of actor method invocations that we've - called so far. _ray_actor_method_names: The names of the actor methods. _ray_method_decorators: Optional decorators for the function invocation. This can be used to change the behavior on the @@ -490,63 +419,33 @@ class ActorHandle(object): _ray_method_num_return_vals: The default number of return values for each method. _ray_class_name: The name of the actor class. - _ray_actor_forks: The number of times this handle has been forked. - _ray_actor_creation_dummy_object_id: The dummy object ID from the actor - creation task. _ray_actor_method_cpus: The number of CPUs required by actor methods. _ray_original_handle: True if this is the original actor handle for a given actor. If this is true, then the actor will be destroyed when this handle goes out of scope. - _ray_actor_job_id: The ID of the job that created the actor - (it is possible that this ActorHandle exists on a job with a - different job ID). - _ray_new_actor_handles: The new actor handles that were created from - this handle since the last task on this handle was submitted. This - is used to garbage-collect dummy objects that are no longer - necessary in the backend. """ def __init__(self, - actor_id, + core_handle, module_name, class_name, - actor_cursor, actor_method_names, method_decorators, method_signatures, method_num_return_vals, - actor_creation_dummy_object_id, actor_method_cpus, - actor_job_id, session_and_job, - actor_handle_id=None): - assert isinstance(actor_id, ActorID) - assert isinstance(actor_job_id, ray.JobID) - self._ray_actor_id = actor_id + original_handle=False): + self._ray_core_handle = core_handle self._ray_module_name = module_name - # False if this actor handle was created by forking or pickling. True - # if it was created by the _serialization_helper function. - self._ray_original_handle = actor_handle_id is None - if self._ray_original_handle: - self._ray_actor_handle_id = ActorHandleID.nil() - else: - assert isinstance(actor_handle_id, ActorHandleID) - self._ray_actor_handle_id = actor_handle_id - self._ray_actor_cursor = actor_cursor - self._ray_actor_counter = 0 + self._ray_original_handle = original_handle self._ray_actor_method_names = actor_method_names self._ray_method_decorators = method_decorators self._ray_method_signatures = method_signatures self._ray_method_num_return_vals = method_num_return_vals self._ray_class_name = class_name - self._ray_actor_forks = 0 - self._ray_actor_creation_dummy_object_id = ( - actor_creation_dummy_object_id) self._ray_actor_method_cpus = actor_method_cpus - self._ray_actor_job_id = actor_job_id self._ray_session_and_job = session_and_job - self._ray_new_actor_handles = [] - self._ray_actor_lock = threading.Lock() def _actor_method_call(self, method_name, @@ -584,38 +483,16 @@ class ActorHandle(object): function_descriptor = FunctionDescriptor( self._ray_module_name, method_name, self._ray_class_name) - if worker.mode == ray.LOCAL_MODE: - function = getattr(worker.actors[self._ray_actor_id], method_name) - object_ids = worker.local_mode_manager.execute( - function, function_descriptor, args, num_return_vals) - else: - with self._ray_actor_lock: - object_ids = worker.submit_task( - function_descriptor, - args, - actor_id=self._ray_actor_id, - actor_handle_id=self._ray_actor_handle_id, - actor_counter=self._ray_actor_counter, - actor_creation_dummy_object_id=( - self._ray_actor_creation_dummy_object_id), - previous_actor_task_dummy_object_id=self._ray_actor_cursor, - new_actor_handles=self._ray_new_actor_handles, - # We add one for the dummy return ID. - num_return_vals=num_return_vals + 1, - resources={"CPU": self._ray_actor_method_cpus}, - placement_resources={}, - job_id=self._ray_actor_job_id, - ) - # Update the actor counter and cursor to reflect the most - # recent invocation. - self._ray_actor_counter += 1 - # The last object returned is the dummy object that should be - # passed in to the next actor method. Do not return it to the - # user. - self._ray_actor_cursor = object_ids.pop() - # We have notified the backend of the new actor handles to - # expect since the last task was submitted, so clear the list. - self._ray_new_actor_handles = [] + with profiling.profile("submit_task"): + if worker.mode == ray.LOCAL_MODE: + function = getattr(worker.actors[self._actor_id], method_name) + object_ids = worker.local_mode_manager.execute( + function, function_descriptor, args, num_return_vals) + else: + object_ids = worker.core_worker.submit_actor_task( + self._ray_core_handle, + function_descriptor.get_function_descriptor_list(), args, + num_return_vals, {"CPU": self._ray_actor_method_cpus}) if len(object_ids) == 1: object_ids = object_ids[0] @@ -654,7 +531,7 @@ class ActorHandle(object): def __repr__(self): return "Actor({}, {})".format(self._ray_class_name, - self._ray_actor_id.hex()) + self._actor_id.hex()) def __del__(self): """Kill the worker that is running this actor.""" @@ -674,8 +551,8 @@ class ActorHandle(object): # and we don't need to send `__ray_terminate__` again. logger.warning( "Actor is garbage collected in the wrong driver." + - " Actor id = %s, class name = %s.", self._ray_actor_id, - self._ray_class_name) + " Actor id = %s, class name = %s.", + self._ray_core_handle.actor_id(), self._ray_class_name) return if worker.connected and self._ray_original_handle: # TODO(rkn): Should we be passing in the actor cursor as a @@ -684,11 +561,11 @@ class ActorHandle(object): @property def _actor_id(self): - return self._ray_actor_id + return self._ray_core_handle.actor_id() @property def _actor_handle_id(self): - return self._ray_actor_handle_id + return self._ray_core_handle.actor_handle_id() def _serialization_helper(self, ray_forking): """This is defined in order to make pickling work. @@ -700,48 +577,17 @@ class ActorHandle(object): Returns: A dictionary of the information needed to reconstruct the object. """ - if ray_forking: - actor_handle_id = compute_actor_handle_id( - self._ray_actor_handle_id, self._ray_actor_forks) - else: - actor_handle_id = self._ray_actor_handle_id - - # Note: _ray_actor_cursor and _ray_actor_creation_dummy_object_id - # could be None. state = { - "actor_id": self._ray_actor_id, - "actor_handle_id": actor_handle_id, + "core_handle": self._ray_core_handle.fork(ray_forking).to_bytes(), "module_name": self._ray_module_name, "class_name": self._ray_class_name, - "actor_cursor": self._ray_actor_cursor, "actor_method_names": self._ray_actor_method_names, "method_decorators": self._ray_method_decorators, "method_signatures": self._ray_method_signatures, "method_num_return_vals": self._ray_method_num_return_vals, - # Actors in local mode don't have dummy objects. - "actor_creation_dummy_object_id": self. - _ray_actor_creation_dummy_object_id, - "actor_method_cpus": self._ray_actor_method_cpus, - "actor_job_id": self._ray_actor_job_id, - "ray_forking": ray_forking + "actor_method_cpus": self._ray_actor_method_cpus } - if ray_forking: - self._ray_actor_forks += 1 - new_actor_handle_id = actor_handle_id - else: - # The execution dependency for a pickled actor handle is never safe - # to release, since it could be unpickled and submit another - # dependent task at any time. Therefore, we notify the backend of a - # random handle ID that will never actually be used. - new_actor_handle_id = ActorHandleID.from_random() - # Notify the backend to expect this new actor handle. The backend will - # not release the cursor for any new handles until the first task for - # each of the new handles is submitted. - # NOTE(swang): There is currently no garbage collection for actor - # handles until the actor itself is removed. - self._ray_new_actor_handles.append(new_actor_handle_id) - return state def _deserialization_helper(self, state, ray_forking): @@ -755,39 +601,19 @@ class ActorHandle(object): worker = ray.worker.get_global_worker() worker.check_connected() - if state["ray_forking"]: - actor_handle_id = state["actor_handle_id"] - else: - # Right now, if the actor handle has been pickled, we create a - # temporary actor handle id for invocations. - # TODO(pcm): This still leads to a lot of actor handles being - # created, there should be a better way to handle pickled - # actor handles. + self.__init__( # TODO(swang): Accessing the worker's current task ID is not # thread-safe. - # TODO(swang): Unpickling the same actor handle twice in the same - # task will break the application, and unpickling it twice in the - # same actor is likely a performance bug. We should consider - # logging a warning in these cases. - actor_handle_id = compute_actor_handle_id_non_forked( - state["actor_handle_id"], worker.current_task_id) - - self.__init__( - state["actor_id"], + ray._raylet.ActorHandle.from_bytes(state["core_handle"], + worker.current_task_id), state["module_name"], state["class_name"], - state["actor_cursor"], state["actor_method_names"], state["method_decorators"], state["method_signatures"], state["method_num_return_vals"], - state["actor_creation_dummy_object_id"], state["actor_method_cpus"], - # This is the ID of the job that owns the actor, not - # necessarily the job that owns this actor handle. - state["actor_job_id"], - worker.current_session_and_job, - actor_handle_id=actor_handle_id) + worker.current_session_and_job) def __getstate__(self): """This code path is used by pickling but not by Ray forking.""" diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index a3f2f66bb..40db19141 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -51,13 +51,16 @@ def register_actor(name, actor_handle): raise TypeError("The actor_handle argument must be an ActorHandle " "object.") actor_name = _calculate_key(name) - pickled_state = pickle.dumps(actor_handle) + + # First check if the actor already exists. + try: + get_actor(name) + exists = True + except ValueError: + exists = False + + if exists: + raise ValueError("An actor with name={} already exists".format(name)) # Add the actor to Redis if it does not already exist. - already_exists = _internal_kv_put(actor_name, pickled_state) - if already_exists: - # If the registration fails, then erase the new actor handle that - # was added when pickling the actor handle. - actor_handle._ray_new_actor_handles.pop() - raise ValueError( - "Error: the actor with name={} already exists".format(name)) + _internal_kv_put(actor_name, pickle.dumps(actor_handle)) diff --git a/python/ray/experimental/signal.py b/python/ray/experimental/signal.py index 25ec072d3..39de8d861 100644 --- a/python/ray/experimental/signal.py +++ b/python/ray/experimental/signal.py @@ -50,7 +50,7 @@ def _get_task_id(source): - If source is a task id, return same task id. """ if type(source) is ray.actor.ActorHandle: - return source._ray_actor_id + return source._actor_id else: if type(source) is ray.TaskID: return source diff --git a/python/ray/includes/common.pxd b/python/ray/includes/common.pxd index 7050b8be8..04ae17942 100644 --- a/python/ray/includes/common.pxd +++ b/python/ray/includes/common.pxd @@ -1,5 +1,5 @@ from libcpp cimport bool as c_bool -from libcpp.memory cimport shared_ptr +from libcpp.memory cimport shared_ptr, unique_ptr from libcpp.string cimport string as c_string from libc.stdint cimport uint8_t, uint64_t, int64_t @@ -191,25 +191,28 @@ cdef extern from "ray/core_worker/task_interface.h" nogil: unordered_map[c_string, double] &resources) cdef cppclass CActorCreationOptions "ray::ActorCreationOptions": - CActorCreationOptions(uint64_t max_reconstructions, - const unordered_map[c_string, double] &resources) + CActorCreationOptions() + CActorCreationOptions( + uint64_t max_reconstructions, c_bool is_direct_call, + const unordered_map[c_string, double] &resources, + const unordered_map[c_string, double] &placement_resources, + const c_vector[c_string] &dynamic_worker_options) cdef cppclass CActorHandle "ray::ActorHandle": CActorHandle( const CActorID &actor_id, const CActorHandleID &actor_handle_id, - const CLanguage actor_language, + const CJobID &job_id, const CObjectID &initial_cursor, + const CLanguage actor_language, c_bool is_direct_call, const c_vector[c_string] &actor_creation_task_function_descriptor) + CActorHandle(CActorHandle &other, c_bool in_band) + CActorHandle( + const c_string &serialized, const CTaskID ¤t_task_id) - CActorHandle(const CActorHandle &other) - CActorID ActorID() const - CActorHandleID ActorHandleID() const - c_vector[c_string] ActorCreationTaskFunctionDescriptor() const - CObjectID ActorCursor() const - int64_t TaskCursor() const - int64_t NumForks() const - CActorHandle Fork() + CActorID GetActorID() const + CActorHandleID GetActorHandleID() const + unique_ptr[CActorHandle] Fork() + unique_ptr[CActorHandle] ForkForSerialization() void Serialize(c_string *output) - CActorHandle Deserialize(const c_string &data) cdef extern from "ray/gcs/gcs_client_interface.h" nogil: cdef cppclass CGcsClientOptions "ray::gcs::GcsClientOptions": diff --git a/python/ray/includes/common.pxi b/python/ray/includes/common.pxi index 971b4cfa7..71cfdfd64 100644 --- a/python/ray/includes/common.pxi +++ b/python/ray/includes/common.pxi @@ -1,23 +1,72 @@ from libcpp cimport bool as c_bool from libcpp.string cimport string as c_string +from libcpp.vector cimport vector as c_vector -from ray.includes.common cimport CGcsClientOptions +from ray.includes.common cimport ( + CActorHandle, + CGcsClientOptions, +) cdef class GcsClientOptions: """Cython wrapper class of C++ `ray::gcs::GcsClientOptions`.""" cdef: - unique_ptr[CGcsClientOptions] gcs_client_options + unique_ptr[CGcsClientOptions] inner def __init__(self, redis_ip, int redis_port, redis_password, c_bool is_test_client=False): if not redis_password: redis_password = "" - self.gcs_client_options.reset( + self.inner.reset( new CGcsClientOptions(redis_ip.encode("ascii"), redis_port, redis_password.encode("ascii"), is_test_client)) cdef CGcsClientOptions* native(self): - return (self.gcs_client_options.get()) + return (self.inner.get()) + +cdef class ActorHandle: + """Cython wrapper class of C++ `ray::ActorHandle`.""" + cdef: + unique_ptr[CActorHandle] inner + + def __init__(self, ActorID actor_id, ActorHandleID actor_handle_id, + JobID job_id, list creation_function_descriptor): + cdef: + c_vector[c_string] c_descriptor + ObjectID cursor = ObjectID.from_random() + + c_descriptor = string_vector_from_list(creation_function_descriptor) + self.inner.reset(new CActorHandle( + actor_id.native(), actor_handle_id.native(), job_id.native(), + cursor.native(), LANGUAGE_PYTHON, False, c_descriptor)) + + def fork(self, c_bool ray_forking): + cdef: + ActorHandle other = ActorHandle.__new__(ActorHandle) + if ray_forking: + other.inner = self.inner.get().Fork() + else: + other.inner = self.inner.get().ForkForSerialization() + return other + + @staticmethod + def from_bytes(c_string bytes, TaskID current_task_id): + cdef: + ActorHandle self = ActorHandle.__new__(ActorHandle) + self.inner.reset(new CActorHandle(bytes, current_task_id.native())) + return self + + def to_bytes(self): + cdef: + c_string output + + self.inner.get().Serialize(&output) + return output + + def actor_id(self): + return ActorID(self.inner.get().GetActorID().Binary()) + + def actor_handle_id(self): + return ActorHandleID(self.inner.get().GetActorHandleID().Binary()) diff --git a/python/ray/includes/task.pxd b/python/ray/includes/task.pxd index 8b2f040a6..4034d4c3e 100644 --- a/python/ray/includes/task.pxd +++ b/python/ray/includes/task.pxd @@ -77,36 +77,6 @@ cdef extern from "ray/common/task/task_spec.h" nogil: c_vector[CActorHandleID] NewActorHandles() const -cdef extern from "ray/common/task/task_util.h" nogil: - cdef cppclass TaskSpecBuilder "ray::TaskSpecBuilder": - TaskSpecBuilder &SetCommonTaskSpec( - const CTaskID &task_id, const CLanguage &language, - const c_vector[c_string] &function_descriptor, - const CJobID &job_id, const CTaskID &parent_task_id, - uint64_t parent_counter, uint64_t num_returns, - const unordered_map[c_string, double] &required_resources, - const unordered_map[c_string, double] &required_placement_resources) # noqa: E501 - - TaskSpecBuilder &AddByRefArg(const CObjectID &arg_id) - - TaskSpecBuilder &AddByValueArg(const c_string &data, - const c_string &metadata) - - TaskSpecBuilder &SetActorCreationTaskSpec( - const CActorID &actor_id, uint64_t max_reconstructions, - const c_vector[c_string] &dynamic_worker_options, - c_bool is_direct_call) - - TaskSpecBuilder &SetActorTaskSpec( - const CActorID &actor_id, const CActorHandleID &actor_handle_id, - const CObjectID &actor_creation_dummy_object_id, - const CObjectID &previous_actor_task_dummy_object_id, - uint64_t actor_counter, - const c_vector[CActorHandleID] &new_handle_ids) - - RpcTaskSpec GetMessage() - - cdef extern from "ray/common/task/task_execution_spec.h" nogil: cdef cppclass CTaskExecutionSpec "ray::TaskExecutionSpecification": CTaskExecutionSpec(RpcTaskExecutionSpec message) diff --git a/python/ray/includes/task.pxi b/python/ray/includes/task.pxi index f2a38ec4d..f1210572f 100644 --- a/python/ray/includes/task.pxi +++ b/python/ray/includes/task.pxi @@ -3,7 +3,6 @@ from ray.includes.task cimport ( CTaskExecutionSpec, CTaskSpec, RpcTaskExecutionSpec, - TaskSpecBuilder, TaskTableData, ) from ray.ray_constants import RAW_BUFFER_METADATA @@ -15,87 +14,6 @@ cdef class TaskSpec: cdef: unique_ptr[CTaskSpec] task_spec - def __init__(self, TaskID task_id, JobID job_id, function_descriptor, - arguments, - int num_returns, TaskID parent_task_id, int parent_counter, - ActorID actor_creation_id, - ObjectID actor_creation_dummy_object_id, - ObjectID previous_actor_task_dummy_object_id, - int32_t max_actor_reconstructions, ActorID actor_id, - ActorHandleID actor_handle_id, int actor_counter, - new_actor_handles, resource_map, placement_resource_map): - cdef: - TaskSpecBuilder builder - unordered_map[c_string, double] required_resources - unordered_map[c_string, double] required_placement_resources - c_vector[c_string] c_function_descriptor - c_string pickled_str - c_vector[CActorHandleID] c_new_actor_handles - - # Convert function descriptor to C++ vector. - for item in function_descriptor: - if not isinstance(item, bytes): - raise TypeError( - "'function_descriptor' takes a list of byte strings.") - c_function_descriptor.push_back(item) - - # Convert resource map to C++ unordered_map. - if resource_map is not None: - required_resources = resource_map_from_dict(resource_map) - if placement_resource_map is not None: - required_placement_resources = ( - resource_map_from_dict(placement_resource_map)) - - # Build common task spec. - builder.SetCommonTaskSpec( - task_id.native(), - LANGUAGE_PYTHON, - c_function_descriptor, - job_id.native(), - parent_task_id.native(), - parent_counter, - num_returns, - required_resources, - required_placement_resources, - ) - - # Build arguments. - for arg in arguments: - if isinstance(arg, ObjectID): - builder.AddByRefArg((arg).native()) - elif isinstance(arg, bytes): - builder.AddByValueArg(arg, RAW_BUFFER_METADATA) - else: - pickled_str = pickle.dumps( - arg, protocol=pickle.HIGHEST_PROTOCOL) - builder.AddByValueArg(pickled_str, b'') - - if not actor_creation_id.is_nil(): - # Actor creation task. - builder.SetActorCreationTaskSpec( - actor_creation_id.native(), - max_actor_reconstructions, - [], - False, - ) - elif not actor_id.is_nil(): - # Actor task. - for new_actor_handle in new_actor_handles: - c_new_actor_handles.push_back( - (new_actor_handle).native()) - builder.SetActorTaskSpec( - actor_id.native(), - actor_handle_id.native(), - actor_creation_dummy_object_id.native(), - previous_actor_task_dummy_object_id.native(), - actor_counter, - c_new_actor_handles, - ) - else: - # Normal task. - pass - self.task_spec.reset(new CTaskSpec(builder.GetMessage())) - @staticmethod cdef make(unique_ptr[CTaskSpec]& task_spec): cdef TaskSpec self = TaskSpec.__new__(TaskSpec) diff --git a/python/ray/remote_function.py b/python/ray/remote_function.py index a1e15f1dc..8685b5158 100644 --- a/python/ray/remote_function.py +++ b/python/ray/remote_function.py @@ -117,7 +117,7 @@ class RemoteFunction(object): memory=None, object_store_memory=None, resources=None): - """An experimental alternate way to submit remote functions.""" + """Submit the remote function for execution.""" worker = ray.worker.get_global_worker() worker.check_connected() @@ -148,11 +148,9 @@ class RemoteFunction(object): self._function, self._function_descriptor, args, num_return_vals) else: - object_ids = worker.submit_task( - self._function_descriptor, - args, - num_return_vals=num_return_vals, - resources=resources) + object_ids = worker.core_worker.submit_task( + self._function_descriptor.get_function_descriptor_list(), + args, num_return_vals, resources) if len(object_ids) == 1: return object_ids[0] diff --git a/python/ray/tests/test_actor.py b/python/ray/tests/test_actor.py index 2623bc7af..f67d3fd6b 100644 --- a/python/ray/tests/test_actor.py +++ b/python/ray/tests/test_actor.py @@ -346,7 +346,7 @@ def test_random_id_generation(ray_start_regular): random.seed(1234) f2 = Foo.remote() - assert f1._ray_actor_id != f2._ray_actor_id + assert f1._actor_id != f2._actor_id def test_actor_class_name(ray_start_regular): diff --git a/python/ray/worker.py b/python/ray/worker.py index bd04c346c..b4f2938ac 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -41,7 +41,6 @@ import ray.signature import ray.state from ray import ( - ActorHandleID, ActorID, WorkerID, JobID, @@ -569,164 +568,6 @@ class Worker(object): assert len(results) == len(object_ids) return results - def submit_task(self, - function_descriptor, - args, - actor_id=None, - actor_handle_id=None, - actor_counter=0, - actor_creation_id=None, - actor_creation_dummy_object_id=None, - previous_actor_task_dummy_object_id=None, - max_actor_reconstructions=0, - new_actor_handles=None, - num_return_vals=None, - resources=None, - placement_resources=None, - job_id=None): - """Submit a remote task to the scheduler. - - Tell the scheduler to schedule the execution of the function with - function_descriptor with arguments args. Retrieve object IDs for the - outputs of the function from the scheduler and immediately return them. - - Args: - function_descriptor: The function descriptor to execute. - args: The arguments to pass into the function. Arguments can be - object IDs or they can be values. If they are values, they must - be serializable objects. - actor_id: The ID of the actor that this task is for. - actor_counter: The counter of the actor task. - actor_creation_id: The ID of the actor to create, if this is an - actor creation task. - actor_creation_dummy_object_id: If this task is an actor method, - then this argument is the dummy object ID associated with the - actor creation task for the corresponding actor. - previous_actor_task_dummy_object_id: If this task is an actor, - then this argument is the dummy object ID associated with the - task previously submitted to the corresponding actor. - num_return_vals: The number of return values this function should - have. - resources: The resource requirements for this task. - placement_resources: The resources required for placing the task. - If this is not provided or if it is an empty dictionary, then - the placement resources will be equal to resources. - job_id: The ID of the relevant job. This is almost always the - job ID of the job that is currently running. However, in - the exceptional case that an actor task is being dispatched to - an actor created by a different job, this should be the - job ID of the job that created the actor. - - Returns: - The return object IDs for this task. - """ - with profiling.profile("submit_task"): - if actor_id is None: - assert actor_handle_id is None - actor_id = ActorID.nil() - actor_handle_id = ActorHandleID.nil() - else: - assert actor_handle_id is not None - - if actor_creation_id is None: - actor_creation_id = ActorID.nil() - - if actor_creation_dummy_object_id is None: - actor_creation_dummy_object_id = ObjectID.nil() - - # Put large or complex arguments that are passed by value in the - # object store first. - args_for_raylet = [] - for arg in args: - if isinstance(arg, ObjectID): - args_for_raylet.append(arg) - elif ray._raylet.check_simple_value(arg): - args_for_raylet.append(arg) - else: - args_for_raylet.append(put(arg)) - - if new_actor_handles is None: - new_actor_handles = [] - - if job_id is None: - job_id = self.current_job_id - - if resources is None: - raise ValueError("The resources dictionary is required.") - for value in resources.values(): - assert (isinstance(value, int) or isinstance(value, float)) - if value < 0: - raise ValueError( - "Resource quantities must be nonnegative.") - if (value >= 1 and isinstance(value, float) - and not value.is_integer()): - raise ValueError( - "Resource quantities must all be whole numbers.") - - # Remove any resources with zero quantity requirements - resources = { - resource_label: resource_quantity - for resource_label, resource_quantity in resources.items() - if resource_quantity > 0 - } - - if placement_resources is None: - placement_resources = {} - - # Increment the worker's task index to track how many tasks - # have been submitted by the current task so far. - self.task_context.task_index += 1 - # The parent task must be set for the submitted task. - assert not self.current_task_id.is_nil() - # Current driver id must not be nil when submitting a task. - # Because every task must belong to a driver. - assert not self.current_job_id.is_nil() - # Submit the task to raylet. - function_descriptor_list = ( - function_descriptor.get_function_descriptor_list()) - assert isinstance(job_id, JobID) - - if actor_creation_id is not None and not actor_creation_id.is_nil( - ): - # This is an actor creation task. - task_id = TaskID.for_actor_creation_task(actor_creation_id) - elif actor_id is not None and not actor_id.is_nil(): - # This is an actor task. - task_id = TaskID.for_actor_task( - self.current_job_id, self.current_task_id, - self.task_context.task_index, actor_id) - else: - # Normal tasks are submitted through the core worker (in the - # future, all tasks will be). - return self.core_worker.submit_task(function_descriptor_list, - args_for_raylet, - num_return_vals, resources) - - # Actor creation tasks and actor tasks are submitted directly to - # the raylet. - task = ray._raylet.TaskSpec( - task_id, - job_id, - function_descriptor_list, - args_for_raylet, - num_return_vals, - self.current_task_id, - self.task_context.task_index, - actor_creation_id, - actor_creation_dummy_object_id, - previous_actor_task_dummy_object_id, - max_actor_reconstructions, - actor_id, - actor_handle_id, - actor_counter, - new_actor_handles, - resources, - placement_resources, - ) - self.raylet_client.submit_task(task) - - return task.returns() - def run_function_on_all_workers(self, function, run_on_other_drivers=False): """Run arbitrary code on all of the workers. diff --git a/src/ray/common/id.cc b/src/ray/common/id.cc index 0f1207dae..2789f1838 100644 --- a/src/ray/common/id.cc +++ b/src/ray/common/id.cc @@ -346,8 +346,8 @@ ObjectID ObjectID::GenerateObjectId(const std::string &task_id_binary, return ret; } -const ActorHandleID ComputeNextActorHandleId(const ActorHandleID &actor_handle_id, - int64_t num_forks) { +const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id, + int64_t num_forks) { // Compute hashes. SHA256_CTX ctx; sha256_init(&ctx); @@ -362,6 +362,23 @@ const ActorHandleID ComputeNextActorHandleId(const ActorHandleID &actor_handle_i return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size())); } +const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id, + const TaskID ¤t_task_id) { + // Compute hashes. + SHA256_CTX ctx; + sha256_init(&ctx); + sha256_update(&ctx, reinterpret_cast(actor_handle_id.Data()), + actor_handle_id.Size()); + sha256_update(&ctx, reinterpret_cast(current_task_id.Data()), + current_task_id.Size()); + + // Compute the final actor handle ID from the hash. + BYTE buff[DIGEST_SIZE]; + sha256_final(&ctx, buff); + RAY_CHECK(DIGEST_SIZE >= ActorHandleID::Size()); + return ActorHandleID::FromBinary(std::string(buff, buff + ActorHandleID::Size())); +} + JobID JobID::FromInt(uint32_t value) { std::vector data(JobID::Size(), 0); std::memcpy(data.data(), &value, JobID::Size()); diff --git a/src/ray/common/id.h b/src/ray/common/id.h index c140ae8a2..8e55f23a7 100644 --- a/src/ray/common/id.h +++ b/src/ray/common/id.h @@ -366,16 +366,25 @@ std::ostream &operator<<(std::ostream &os, const ObjectID &id); #undef DEFINE_UNIQUE_ID -// Restore the compiler alignment to defult (8 bytes). +// Restore the compiler alignment to default (8 bytes). #pragma pack(pop) -/// Compute the next actor handle ID of a new actor handle during a fork operation. +/// Compute an actor handle ID for a newly forked actor handle. /// -/// \param actor_handle_id The actor handle ID of original actor. -/// \param num_forks The count of forks of original actor. -/// \return The next actor handle ID generated from the given info. -const ActorHandleID ComputeNextActorHandleId(const ActorHandleID &actor_handle_id, - int64_t num_forks); +/// \param actor_handle_id The actor handle ID of the existing actor handle. +/// \param num_forks The number of forks of the existing actor handle. +/// \return Generated actor handle ID. +const ActorHandleID ComputeForkedActorHandleId(const ActorHandleID &actor_handle_id, + int64_t num_forks); + +/// Compute an actor handle ID for a new actor handle created by an +/// out-of-band serialization mechanism. +/// +/// \param actor_handle_id The actor handle ID of the existing actor handle. +/// \param current_task_id The current task ID. +/// \return Generated actor handle ID. +const ActorHandleID ComputeSerializedActorHandleId(const ActorHandleID &actor_handle_id, + const TaskID ¤t_task_id); template BaseID::BaseID() { diff --git a/src/ray/core_worker/actor_handle.cc b/src/ray/core_worker/actor_handle.cc new file mode 100644 index 000000000..576181546 --- /dev/null +++ b/src/ray/core_worker/actor_handle.cc @@ -0,0 +1,107 @@ +#include + +#include "ray/core_worker/actor_handle.h" + +namespace ray { + +ActorHandle::ActorHandle( + const class ActorID &actor_id, const class ActorHandleID &actor_handle_id, + const class JobID &job_id, const ObjectID &initial_cursor, + const Language actor_language, bool is_direct_call, + const std::vector &actor_creation_task_function_descriptor) { + inner_.set_actor_id(actor_id.Data(), actor_id.Size()); + inner_.set_actor_handle_id(actor_handle_id.Data(), actor_handle_id.Size()); + inner_.set_creation_job_id(job_id.Data(), job_id.Size()); + inner_.set_actor_language(actor_language); + *inner_.mutable_actor_creation_task_function_descriptor() = { + actor_creation_task_function_descriptor.begin(), + actor_creation_task_function_descriptor.end()}; + inner_.set_actor_cursor(initial_cursor.Binary()); + inner_.set_is_direct_call(is_direct_call); + // Increment the task counter to account for the actor creation task. + task_counter_++; +} + +std::unique_ptr ActorHandle::Fork() { + std::unique_lock guard(mutex_); + std::unique_ptr child = + std::unique_ptr(new ActorHandle(inner_)); + child->inner_ = inner_; + const class ActorHandleID new_actor_handle_id = + ComputeForkedActorHandleId(GetActorHandleID(), num_forks_++); + // Notify the backend to expect this new actor handle. The backend will + // not release the cursor for any new handles until the first task for + // each of the new handles is submitted. + // NOTE(swang): There is currently no garbage collection for actor + // handles until the actor itself is removed. + new_actor_handles_.push_back(new_actor_handle_id); + guard.unlock(); + + child->inner_.set_actor_handle_id(new_actor_handle_id.Data(), + new_actor_handle_id.Size()); + return child; +} + +std::unique_ptr ActorHandle::ForkForSerialization() { + std::unique_lock guard(mutex_); + std::unique_ptr child = + std::unique_ptr(new ActorHandle(inner_)); + child->inner_ = inner_; + // The execution dependency for a serialized actor handle is never safe + // to release, since it could be deserialized and submit another + // dependent task at any time. Therefore, we notify the backend of a + // random handle ID that will never actually be used. + new_actor_handles_.push_back(ActorHandleID::FromRandom()); + guard.unlock(); + + // We set the actor handle ID to nil to signal that this actor handle was + // created by an out-of-band fork. A new actor handle ID will be computed + // when the handle is deserialized. + const class ActorHandleID new_actor_handle_id = ActorHandleID::Nil(); + child->inner_.set_actor_handle_id(new_actor_handle_id.Data(), + new_actor_handle_id.Size()); + return child; +} + +ActorHandle::ActorHandle(const std::string &serialized, const TaskID ¤t_task_id) { + inner_.ParseFromString(serialized); + // If the actor handle ID is nil, this serialized handle was created by an out-of-band + // mechanism (see fork constructor above), so we compute a new actor handle ID. + // TODO(pcm): This still leads to a lot of actor handles being + // created, there should be a better way to handle serialized + // actor handles. + // TODO(swang): Deserializing the same actor handle twice in the same + // task will break the application, and deserializing it twice in the + // same actor is likely a performance bug. We should consider + // logging a warning in these cases. + if (ActorHandleID::FromBinary(inner_.actor_handle_id()).IsNil()) { + const class ActorHandleID new_actor_handle_id = ComputeSerializedActorHandleId( + ActorHandleID::FromBinary(inner_.actor_handle_id()), current_task_id); + inner_.set_actor_handle_id(new_actor_handle_id.Data(), new_actor_handle_id.Size()); + } +} + +void ActorHandle::SetActorTaskSpec(TaskSpecBuilder &builder, + const TaskTransportType transport_type, + const ObjectID new_cursor) { + std::unique_lock guard(mutex_); + // Build actor task spec. + const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(GetActorID()); + const ObjectID actor_creation_dummy_object_id = + ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1, + /*transport_type=*/static_cast(transport_type)); + builder.SetActorTaskSpec(GetActorID(), GetActorHandleID(), + actor_creation_dummy_object_id, + /*previous_actor_task_dummy_object_id=*/ActorCursor(), + task_counter_++, new_actor_handles_); + + inner_.set_actor_cursor(new_cursor.Binary()); + new_actor_handles_.clear(); +} + +void ActorHandle::Serialize(std::string *output) { + std::unique_lock guard(mutex_); + inner_.SerializeToString(output); +} + +} // namespace ray diff --git a/src/ray/core_worker/actor_handle.h b/src/ray/core_worker/actor_handle.h new file mode 100644 index 000000000..c54d21a57 --- /dev/null +++ b/src/ray/core_worker/actor_handle.h @@ -0,0 +1,85 @@ +#ifndef RAY_CORE_WORKER_ACTOR_HANDLE_H +#define RAY_CORE_WORKER_ACTOR_HANDLE_H + +#include + +#include "ray/common/id.h" +#include "ray/common/task/task_util.h" +#include "ray/core_worker/common.h" +#include "ray/core_worker/context.h" +#include "ray/protobuf/core_worker.pb.h" + +namespace ray { + +class ActorHandle { + public: + ActorHandle(ray::rpc::ActorHandle inner) : inner_(inner) {} + + // Constructs a new ActorHandle as part of the actor creation process. + ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id, + const JobID &job_id, const ObjectID &initial_cursor, + const Language actor_language, bool is_direct_call, + const std::vector &actor_creation_task_function_descriptor); + + /// Constructs an ActorHandle from a serialized string. + ActorHandle(const std::string &serialized, const TaskID ¤t_task_id); + + /// Forks a child ActorHandle. This will modify the handle to account for the newly + /// forked child handle. This should only be used for forks that are part of a Ray + /// API call (e.g., passing an actor handle into a remote function). + std::unique_ptr Fork(); + + /// Forks a child ActorHandle. This will *not* modify the handle to account for the + /// newly forked child handle. This should be used by application-level code for + /// serialization in order to pass an actor handle for uses not covered by the Ray API. + std::unique_ptr ForkForSerialization(); + + ActorID GetActorID() const { return ActorID::FromBinary(inner_.actor_id()); }; + + ActorHandleID GetActorHandleID() const { + return ActorHandleID::FromBinary(inner_.actor_handle_id()); + }; + + /// ID of the job that created the actor (it is possible that the handle + /// exists on a job with a different job ID). + JobID CreationJobID() const { return JobID::FromBinary(inner_.creation_job_id()); }; + + Language ActorLanguage() const { return inner_.actor_language(); }; + + std::vector ActorCreationTaskFunctionDescriptor() const { + return VectorFromProtobuf(inner_.actor_creation_task_function_descriptor()); + }; + + ObjectID ActorCursor() const { return ObjectID::FromBinary(inner_.actor_cursor()); } + + bool IsDirectCallActor() const { return inner_.is_direct_call(); } + + void SetActorTaskSpec(TaskSpecBuilder &builder, const TaskTransportType transport_type, + const ObjectID new_cursor); + + void Serialize(std::string *output); + + private: + // Protobuf-defined persistent state of the actor handle. + ray::rpc::ActorHandle inner_; + + // Number of times this handle has been forked. + uint64_t num_forks_ = 0; + + // Number of tasks that have been submitted on this handle. + uint64_t task_counter_ = 0; + + /// The new actor handles that were created from this handle + /// since the last task on this handle was submitted. This is + /// used to garbage-collect dummy objects that are no longer + /// necessary in the backend. + std::vector new_actor_handles_; + + std::mutex mutex_; + + FRIEND_TEST(ZeroNodeTest, TestActorHandle); +}; + +} // namespace ray + +#endif // RAY_CORE_WORKER_ACTOR_HANDLE_H diff --git a/src/ray/core_worker/core_worker.cc b/src/ray/core_worker/core_worker.cc index 75ad4902e..aeb1ed301 100644 --- a/src/ray/core_worker/core_worker.cc +++ b/src/ray/core_worker/core_worker.cc @@ -20,7 +20,7 @@ CoreWorker::CoreWorker( io_work_(io_service_) { // Initialize logging if log_dir is passed. Otherwise, it must be initialized // and cleaned up by the caller. - if (!log_dir_.empty()) { + if (log_dir_ != "") { std::stringstream app_name; app_name << LanguageString(language_) << "-" << WorkerTypeString(worker_type_) << "-" << worker_context_.GetWorkerID(); diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc index 1a91e0fb0..41715f3b6 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_actor_NativeRayActor.cc @@ -20,8 +20,7 @@ extern "C" { */ JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFork( JNIEnv *env, jclass o, jlong nativeActorHandle) { - auto new_actor_handle = GetActorHandle(nativeActorHandle).Fork(); - return reinterpret_cast(new ray::ActorHandle(new_actor_handle)); + return reinterpret_cast(GetActorHandle(nativeActorHandle).Fork().release()); } /* @@ -32,7 +31,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeFork( JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorId( JNIEnv *env, jclass o, jlong nativeActorHandle) { return IdToJavaByteArray(env, - GetActorHandle(nativeActorHandle).ActorID()); + GetActorHandle(nativeActorHandle).GetActorID()); } /* @@ -44,7 +43,7 @@ JNIEXPORT jbyteArray JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetActorHandleId( JNIEnv *env, jclass o, jlong nativeActorHandle) { return IdToJavaByteArray( - env, GetActorHandle(nativeActorHandle).ActorHandleID()); + env, GetActorHandle(nativeActorHandle).GetActorHandleID()); } /* @@ -62,7 +61,8 @@ JNIEXPORT jint JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeGetLangua * Method: nativeIsDirectCallActor * Signature: (J)Z */ -JNIEXPORT jboolean JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor( +JNIEXPORT jboolean JNICALL +Java_org_ray_runtime_actor_NativeRayActor_nativeIsDirectCallActor( JNIEnv *env, jclass o, jlong nativeActorHandle) { return GetActorHandle(nativeActorHandle).IsDirectCallActor(); } @@ -104,8 +104,7 @@ JNIEXPORT jlong JNICALL Java_org_ray_runtime_actor_NativeRayActor_nativeDeserial auto buffer = JavaByteArrayToNativeBuffer(env, data); RAY_CHECK(buffer->Size() > 0); auto binary = std::string(reinterpret_cast(buffer->Data()), buffer->Size()); - return reinterpret_cast( - new ray::ActorHandle(ray::ActorHandle::Deserialize(binary))); + return reinterpret_cast(new ray::ActorHandle(binary, TaskID::Nil())); } /* diff --git a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc index a033bdfbd..129a49aa9 100644 --- a/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc +++ b/src/ray/core_worker/lib/java/org_ray_runtime_task_NativeTaskSubmitter.cc @@ -84,7 +84,7 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, max_reconstructions = static_cast(env->GetIntField( actorCreationOptions, java_actor_creation_options_max_reconstructions)); use_direct_call = env->GetBooleanField(actorCreationOptions, - java_actor_creation_options_use_direct_call); + java_actor_creation_options_use_direct_call); jobject java_resources = env->GetObjectField(actorCreationOptions, java_base_task_options_resources); resources = ToResources(env, java_resources); @@ -101,7 +101,7 @@ inline ray::ActorCreationOptions ToActorCreationOptions(JNIEnv *env, } ray::ActorCreationOptions action_creation_options{ - static_cast(max_reconstructions), use_direct_call, resources, + static_cast(max_reconstructions), use_direct_call, resources, resources, dynamic_worker_options}; return action_creation_options; } diff --git a/src/ray/core_worker/task_interface.cc b/src/ray/core_worker/task_interface.cc index 5eeb4c32f..f6fb92a5c 100644 --- a/src/ray/core_worker/task_interface.cc +++ b/src/ray/core_worker/task_interface.cc @@ -1,103 +1,11 @@ #include "ray/core_worker/task_interface.h" #include "ray/core_worker/context.h" -#include "ray/core_worker/core_worker.h" #include "ray/core_worker/task_interface.h" #include "ray/core_worker/transport/direct_actor_transport.h" #include "ray/core_worker/transport/raylet_transport.h" namespace ray { -ActorHandle::ActorHandle( - const class ActorID &actor_id, const class ActorHandleID &actor_handle_id, - const Language actor_language, bool is_direct_call, - const std::vector &actor_creation_task_function_descriptor) { - inner_.set_actor_id(actor_id.Data(), actor_id.Size()); - inner_.set_actor_handle_id(actor_handle_id.Data(), actor_handle_id.Size()); - inner_.set_actor_language(actor_language); - *inner_.mutable_actor_creation_task_function_descriptor() = { - actor_creation_task_function_descriptor.begin(), - actor_creation_task_function_descriptor.end()}; - const auto &actor_creation_task_id = TaskID::ForActorCreationTask(actor_id); - const auto &actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1, /*transport_type=*/0); - inner_.set_actor_cursor(actor_creation_dummy_object_id.Data(), - actor_creation_dummy_object_id.Size()); - inner_.set_is_direct_call(is_direct_call); -} - -ActorHandle::ActorHandle(const ActorHandle &other) - : inner_(other.inner_), new_actor_handles_(other.new_actor_handles_) {} - -ray::ActorID ActorHandle::ActorID() const { - return ActorID::FromBinary(inner_.actor_id()); -}; - -ray::ActorHandleID ActorHandle::ActorHandleID() const { - return ActorHandleID::FromBinary(inner_.actor_handle_id()); -}; - -Language ActorHandle::ActorLanguage() const { return inner_.actor_language(); }; - -std::vector ActorHandle::ActorCreationTaskFunctionDescriptor() const { - return VectorFromProtobuf(inner_.actor_creation_task_function_descriptor()); -}; - -ObjectID ActorHandle::ActorCursor() const { - return ObjectID::FromBinary(inner_.actor_cursor()); -}; - -int64_t ActorHandle::TaskCounter() const { return inner_.task_counter(); }; - -int64_t ActorHandle::NumForks() const { return inner_.num_forks(); }; - -bool ActorHandle::IsDirectCallActor() const { return inner_.is_direct_call(); } - -ActorHandle ActorHandle::Fork() { - ActorHandle new_handle; - std::unique_lock guard(mutex_); - new_handle.inner_ = inner_; - inner_.set_num_forks(inner_.num_forks() + 1); - const auto next_actor_handle_id = ComputeNextActorHandleId( - ActorHandleID::FromBinary(inner_.actor_handle_id()), inner_.num_forks()); - new_handle.inner_.set_actor_handle_id(next_actor_handle_id.Data(), - next_actor_handle_id.Size()); - new_actor_handles_.push_back(next_actor_handle_id); - guard.unlock(); - - new_handle.inner_.set_task_counter(0); - new_handle.inner_.set_num_forks(0); - return new_handle; -} - -void ActorHandle::Serialize(std::string *output) { - std::unique_lock guard(mutex_); - inner_.SerializeToString(output); -} - -ActorHandle ActorHandle::Deserialize(const std::string &data) { - ActorHandle ret; - ret.inner_.ParseFromString(data); - return ret; -} - -ActorHandle::ActorHandle() {} - -void ActorHandle::SetActorCursor(const ObjectID &actor_cursor) { - inner_.set_actor_cursor(actor_cursor.Binary()); -}; - -int64_t ActorHandle::IncreaseTaskCounter() { - int64_t old = inner_.task_counter(); - inner_.set_task_counter(old + 1); - return old; -} - -std::vector ActorHandle::NewActorHandles() const { - return new_actor_handles_; -} - -void ActorHandle::ClearNewActorHandles() { new_actor_handles_.clear(); } - CoreWorkerTaskInterface::CoreWorkerTaskInterface( WorkerContext &worker_context, std::unique_ptr &raylet_client, CoreWorkerObjectInterface &object_interface, boost::asio::io_service &io_service, @@ -115,16 +23,17 @@ CoreWorkerTaskInterface::CoreWorkerTaskInterface( } void CoreWorkerTaskInterface::BuildCommonTaskSpec( - TaskSpecBuilder &builder, const TaskID &task_id, const int task_index, - const RayFunction &function, const std::vector &args, uint64_t num_returns, + TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, + const int task_index, const RayFunction &function, const std::vector &args, + uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, TaskTransportType transport_type, std::vector *return_ids) { // Build common task spec. - builder.SetCommonTaskSpec( - task_id, function.GetLanguage(), function.GetFunctionDescriptor(), - worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), task_index, - num_returns, required_resources, required_placement_resources); + builder.SetCommonTaskSpec(task_id, function.GetLanguage(), + function.GetFunctionDescriptor(), job_id, + worker_context_.GetCurrentTaskID(), task_index, num_returns, + required_resources, required_placement_resources); // Set task arguments. for (const auto &arg : args) { if (arg.IsPassedByReference()) { @@ -152,9 +61,9 @@ Status CoreWorkerTaskInterface::SubmitTask(const RayFunction &function, const auto task_id = TaskID::ForNormalTask(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index); - BuildCommonTaskSpec(builder, task_id, next_task_index, function, args, - task_options.num_returns, task_options.resources, {}, - TaskTransportType::RAYLET, return_ids); + BuildCommonTaskSpec(builder, worker_context_.GetCurrentJobID(), task_id, + next_task_index, function, args, task_options.num_returns, + task_options.resources, {}, TaskTransportType::RAYLET, return_ids); return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); } @@ -167,20 +76,21 @@ Status CoreWorkerTaskInterface::CreateActor( ActorID::Of(worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), next_task_index); const TaskID actor_creation_task_id = TaskID::ForActorCreationTask(actor_id); + const JobID job_id = worker_context_.GetCurrentJobID(); std::vector return_ids; TaskSpecBuilder builder; - BuildCommonTaskSpec(builder, actor_creation_task_id, next_task_index, function, args, 1, - actor_creation_options.resources, actor_creation_options.resources, + BuildCommonTaskSpec(builder, job_id, actor_creation_task_id, next_task_index, function, + args, 1, actor_creation_options.resources, + actor_creation_options.placement_resources, TaskTransportType::RAYLET, &return_ids); builder.SetActorCreationTaskSpec(actor_id, actor_creation_options.max_reconstructions, actor_creation_options.dynamic_worker_options, actor_creation_options.is_direct_call); *actor_handle = std::unique_ptr(new ActorHandle( - actor_id, ActorHandleID::Nil(), function.GetLanguage(), - actor_creation_options.is_direct_call, function.GetFunctionDescriptor())); - (*actor_handle)->IncreaseTaskCounter(); - (*actor_handle)->SetActorCursor(return_ids[0]); + actor_id, ActorHandleID::Nil(), job_id, /*actor_cursor=*/return_ids[0], + function.GetLanguage(), actor_creation_options.is_direct_call, + function.GetFunctionDescriptor())); return task_submitters_[TaskTransportType::RAYLET]->SubmitTask(builder.Build()); } @@ -191,46 +101,29 @@ Status CoreWorkerTaskInterface::SubmitActorTask(ActorHandle &actor_handle, const TaskOptions &task_options, std::vector *return_ids) { // Add one for actor cursor object id for tasks. - const auto num_returns = task_options.num_returns + 1; + const int num_returns = task_options.num_returns + 1; const bool is_direct_call = actor_handle.IsDirectCallActor(); - const auto transport_type = + const TaskTransportType transport_type = is_direct_call ? TaskTransportType::DIRECT_ACTOR : TaskTransportType::RAYLET; // Build common task spec. TaskSpecBuilder builder; const int next_task_index = worker_context_.GetNextTaskIndex(); - const auto actor_task_id = TaskID::ForActorTask( + const TaskID actor_task_id = TaskID::ForActorTask( worker_context_.GetCurrentJobID(), worker_context_.GetCurrentTaskID(), - next_task_index, actor_handle.ActorID()); - BuildCommonTaskSpec(builder, actor_task_id, next_task_index, function, args, - num_returns, task_options.resources, {}, transport_type, - return_ids); + next_task_index, actor_handle.GetActorID()); + BuildCommonTaskSpec(builder, actor_handle.CreationJobID(), actor_task_id, + next_task_index, function, args, num_returns, + task_options.resources, {}, transport_type, return_ids); - std::unique_lock guard(actor_handle.mutex_); - // Build actor task spec. - const auto actor_creation_task_id = - TaskID::ForActorCreationTask(actor_handle.ActorID()); - const auto actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(actor_creation_task_id, /*index=*/1, - /*transport_type=*/static_cast(transport_type)); - builder.SetActorTaskSpec( - actor_handle.ActorID(), actor_handle.ActorHandleID(), - actor_creation_dummy_object_id, - /*previous_actor_task_dummy_object_id=*/actor_handle.ActorCursor(), - actor_handle.IncreaseTaskCounter(), actor_handle.NewActorHandles()); - - // Manipulate actor handle state. - auto actor_cursor = (*return_ids).back(); - actor_handle.SetActorCursor(actor_cursor); - actor_handle.ClearNewActorHandles(); - - guard.unlock(); + const ObjectID new_cursor = return_ids->back(); + actor_handle.SetActorTaskSpec(builder, transport_type, new_cursor); // Submit task. auto status = task_submitters_[transport_type]->SubmitTask(builder.Build()); // Remove cursor from return ids. - (*return_ids).pop_back(); + return_ids->pop_back(); return status; } diff --git a/src/ray/core_worker/task_interface.h b/src/ray/core_worker/task_interface.h index 802a7a504..ea4201848 100644 --- a/src/ray/core_worker/task_interface.h +++ b/src/ray/core_worker/task_interface.h @@ -8,6 +8,7 @@ #include "ray/common/task/task.h" #include "ray/common/task/task_spec.h" #include "ray/common/task/task_util.h" +#include "ray/core_worker/actor_handle.h" #include "ray/core_worker/common.h" #include "ray/core_worker/context.h" #include "ray/core_worker/object_interface.h" @@ -17,8 +18,6 @@ namespace ray { -class CoreWorker; - /// Options of a non-actor-creation task. struct TaskOptions { TaskOptions() {} @@ -36,10 +35,12 @@ struct ActorCreationOptions { ActorCreationOptions() {} ActorCreationOptions(uint64_t max_reconstructions, bool is_direct_call, const std::unordered_map &resources, + const std::unordered_map &placement_resources, const std::vector &dynamic_worker_options) : max_reconstructions(max_reconstructions), is_direct_call(is_direct_call), resources(resources), + placement_resources(placement_resources), dynamic_worker_options(dynamic_worker_options) {} /// Maximum number of times that the actor should be reconstructed when it dies @@ -50,81 +51,13 @@ struct ActorCreationOptions { const bool is_direct_call = false; /// Resources required by the whole lifetime of this actor. const std::unordered_map resources; + /// Resources required to place this actor. + const std::unordered_map placement_resources; /// The dynamic options used in the worker command when starting a worker process for /// an actor creation task. const std::vector dynamic_worker_options; }; -/// A handle to an actor. -class ActorHandle { - public: - ActorHandle(const ActorID &actor_id, const ActorHandleID &actor_handle_id, - const Language actor_language, bool is_direct_call, - const std::vector &actor_creation_task_function_descriptor); - - ActorHandle(const ActorHandle &other); - - /// ID of the actor. - ray::ActorID ActorID() const; - - /// ID of this actor handle. - ray::ActorHandleID ActorHandleID() const; - - /// Language of the actor. - Language ActorLanguage() const; - - // Function descriptor of actor creation task. - std::vector ActorCreationTaskFunctionDescriptor() const; - - /// The unique id of the last return of the last task. - /// It's used as a dependency for the next task. - ObjectID ActorCursor() const; - - /// The number of tasks that have been invoked on this actor. - int64_t TaskCounter() const; - - /// The number of times that this actor handle has been forked. - /// It's used to make sure ids of actor handles are unique. - int64_t NumForks() const; - - /// Whether direct call is used. If this is true, then the tasks - /// are submitted directly to the actor without going through raylet. - bool IsDirectCallActor() const; - - ActorHandle Fork(); - - void Serialize(std::string *output); - - static ActorHandle Deserialize(const std::string &data); - - private: - ActorHandle(); - - /// Set actor cursor. - void SetActorCursor(const ObjectID &actor_cursor); - - /// Increase task counter. - int64_t IncreaseTaskCounter(); - - std::vector NewActorHandles() const; - - void ClearNewActorHandles(); - - private: - /// Protobuf defined ActorHandle. - ray::rpc::ActorHandle inner_; - /// The new actor handles that were created from this handle - /// since the last task on this handle was submitted. This is - /// used to garbage-collect dummy objects that are no longer - /// necessary in the backend. - std::vector new_actor_handles_; - - /// Mutex to protect mutable fields. - std::mutex mutex_; - - friend class CoreWorkerTaskInterface; -}; - /// The interface that contains all `CoreWorker` methods that are related to task /// submission. class CoreWorkerTaskInterface { @@ -173,6 +106,7 @@ class CoreWorkerTaskInterface { /// Build common attributes of the task spec, and compute return ids. /// /// \param[in] builder Builder to build a `TaskSpec`. + /// \param[in] job_id The ID of the job submitting the task. /// \param[in] task_id The ID of this task. /// \param[in] task_index The task index used to build this task. /// \param[in] function The remote function to execute. @@ -185,8 +119,9 @@ class CoreWorkerTaskInterface { /// \param[out] return_ids Return IDs. /// \return Void. void BuildCommonTaskSpec( - TaskSpecBuilder &builder, const TaskID &task_id, const int task_index, - const RayFunction &function, const std::vector &args, uint64_t num_returns, + TaskSpecBuilder &builder, const JobID &job_id, const TaskID &task_id, + const int task_index, const RayFunction &function, const std::vector &args, + uint64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, TaskTransportType transport_type, std::vector *return_ids); diff --git a/src/ray/core_worker/test/core_worker_test.cc b/src/ray/core_worker/test/core_worker_test.cc index bb3f2acb4..04bead63e 100644 --- a/src/ray/core_worker/test/core_worker_test.cc +++ b/src/ray/core_worker/test/core_worker_test.cc @@ -61,7 +61,8 @@ std::unique_ptr CreateActorHelper( std::vector args; args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); - ActorCreationOptions actor_options{max_reconstructions, is_direct_call, resources, {}}; + ActorCreationOptions actor_options{ + max_reconstructions, is_direct_call, resources, resources, {}}; // Create an actor. RAY_CHECK_OK(worker.Tasks().CreateActor(func, args, actor_options, &actor_handle)); @@ -358,7 +359,7 @@ void CoreWorkerTest::TestActorReconstruction( auto actor_handle = CreateActorHelper(driver, resources, is_direct_call, 1000); // Wait for actor alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->ActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been created"; @@ -373,9 +374,9 @@ void CoreWorkerTest::TestActorReconstruction( ASSERT_EQ(system("pkill mock_worker"), 0); // Wait for actor restruction event, and then for alive event. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->ActorID(), false, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), false, 30 * 1000 /* 30s */)); - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->ActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, 30 * 1000 /* 30s */)); RAY_LOG(INFO) << "actor has been reconstructed"; @@ -649,10 +650,12 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); std::unordered_map resources; - ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, {}}; + ActorCreationOptions actor_options{ + 0, /*is_direct_call*/ true, resources, resources, {}}; const auto job_id = NextJobId(); ActorHandle actor_handle(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), - ActorHandleID::Nil(), function.GetLanguage(), true, + ActorHandleID::Nil(), job_id, ObjectID::FromRandom(), + function.GetLanguage(), true, function.GetFunctionDescriptor()); // Manually create `num_tasks` task specs, and for each of them create a @@ -679,13 +682,8 @@ TEST_F(ZeroNodeTest, TestTaskSpecPerf) { } } - const auto actor_creation_dummy_object_id = - ObjectID::ForTaskReturn(TaskID::ForActorCreationTask(actor_handle.ActorID()), - /*index=*/1, /*transport_type=*/0); - builder.SetActorTaskSpec( - actor_handle.ActorID(), actor_handle.ActorHandleID(), - actor_creation_dummy_object_id, - /*previous_actor_task_dummy_object_id=*/actor_handle.ActorCursor(), 0, {}); + actor_handle.SetActorTaskSpec(builder, TaskTransportType::RAYLET, + ObjectID::FromRandom()); const auto &task_spec = builder.Build(); @@ -712,11 +710,12 @@ TEST_F(SingleNodeTest, TestDirectActorTaskSubmissionPerf) { args.emplace_back(TaskArg::PassByValue(std::make_shared(buffer, nullptr))); std::unordered_map resources; - ActorCreationOptions actor_options{0, /*is_direct_call*/ true, resources, {}}; + ActorCreationOptions actor_options{ + 0, /*is_direct_call*/ true, resources, resources, {}}; // Create an actor. RAY_CHECK_OK(driver.Tasks().CreateActor(func, args, actor_options, &actor_handle)); // wait for actor creation finish. - ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->ActorID(), true, + ASSERT_TRUE(WaitForDirectCallActorState(driver, actor_handle->GetActorID(), true, 30 * 1000 /* 30s */)); // Test submitting some tasks with by-value args for that actor. int64_t start_ms = current_time_ms(); @@ -777,37 +776,75 @@ TEST_F(ZeroNodeTest, TestWorkerContext) { } TEST_F(ZeroNodeTest, TestActorHandle) { - const auto job_id = NextJobId(); - ActorHandle handle1(ActorID::Of(job_id, TaskID::ForDriverTask(job_id), 1), - ActorHandleID::FromRandom(), Language::JAVA, false, - {"org.ray.exampleClass", "exampleMethod", "exampleSignature"}); + const JobID job_id = NextJobId(); + const TaskID task_id = TaskID::ForDriverTask(job_id); + const ActorHandleID actor_handle_id = ActorHandleID::FromRandom(); + ActorHandle parent(ActorID::Of(job_id, task_id, 1), actor_handle_id, job_id, + ObjectID::FromRandom(), Language::JAVA, false, + {"org.ray.exampleClass", "exampleMethod", "exampleSignature"}); - auto forkedHandle1 = handle1.Fork(); - ASSERT_EQ(1, handle1.NumForks()); - ASSERT_EQ(handle1.ActorID(), forkedHandle1.ActorID()); - ASSERT_NE(handle1.ActorHandleID(), forkedHandle1.ActorHandleID()); - ASSERT_EQ(handle1.ActorLanguage(), forkedHandle1.ActorLanguage()); - ASSERT_EQ(handle1.ActorCreationTaskFunctionDescriptor(), - forkedHandle1.ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(handle1.ActorCursor(), forkedHandle1.ActorCursor()); - ASSERT_EQ(0, forkedHandle1.TaskCounter()); - ASSERT_EQ(0, forkedHandle1.NumForks()); - auto forkedHandle2 = handle1.Fork(); - ASSERT_EQ(2, handle1.NumForks()); - ASSERT_EQ(0, forkedHandle2.TaskCounter()); - ASSERT_EQ(0, forkedHandle2.NumForks()); + // Test in-band forking logic. + std::unique_ptr forkedHandle1 = parent.Fork(); + ASSERT_EQ(1, parent.num_forks_); + ASSERT_EQ(parent.GetActorID(), forkedHandle1->GetActorID()); + ASSERT_EQ(actor_handle_id, parent.GetActorHandleID()); + ASSERT_NE(parent.GetActorHandleID(), forkedHandle1->GetActorHandleID()); + ASSERT_EQ(parent.ActorLanguage(), forkedHandle1->ActorLanguage()); + ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(), + forkedHandle1->ActorCreationTaskFunctionDescriptor()); + ASSERT_EQ(parent.ActorCursor(), forkedHandle1->ActorCursor()); + ASSERT_EQ(0, forkedHandle1->task_counter_); + ASSERT_EQ(0, forkedHandle1->num_forks_); + ASSERT_EQ(parent.new_actor_handles_.size(), 1); + ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle1->GetActorHandleID()); + parent.new_actor_handles_.clear(); - std::string buffer; - handle1.Serialize(&buffer); - auto handle2 = ActorHandle::Deserialize(buffer); - ASSERT_EQ(handle1.ActorID(), handle2.ActorID()); - ASSERT_EQ(handle1.ActorHandleID(), handle2.ActorHandleID()); - ASSERT_EQ(handle1.ActorLanguage(), handle2.ActorLanguage()); - ASSERT_EQ(handle1.ActorCreationTaskFunctionDescriptor(), - handle2.ActorCreationTaskFunctionDescriptor()); - ASSERT_EQ(handle1.ActorCursor(), handle2.ActorCursor()); - ASSERT_EQ(handle1.TaskCounter(), handle2.TaskCounter()); - ASSERT_EQ(handle1.NumForks(), handle2.NumForks()); + std::unique_ptr forkedHandle2 = parent.Fork(); + ASSERT_EQ(2, parent.num_forks_); + ASSERT_EQ(0, forkedHandle2->task_counter_); + ASSERT_EQ(0, forkedHandle2->num_forks_); + ASSERT_EQ(parent.new_actor_handles_.size(), 1); + ASSERT_EQ(parent.new_actor_handles_.back(), forkedHandle2->GetActorHandleID()); + parent.new_actor_handles_.clear(); + + // Test serialization and deserialization for in-band fork. + std::string buffer1; + forkedHandle2->Serialize(&buffer1); + ActorHandle deserializedHandle1(buffer1, task_id); + ASSERT_EQ(forkedHandle2->GetActorID(), deserializedHandle1.GetActorID()); + ASSERT_EQ(forkedHandle2->GetActorHandleID(), deserializedHandle1.GetActorHandleID()); + ASSERT_EQ(forkedHandle2->ActorLanguage(), deserializedHandle1.ActorLanguage()); + ASSERT_EQ(forkedHandle2->ActorCreationTaskFunctionDescriptor(), + deserializedHandle1.ActorCreationTaskFunctionDescriptor()); + ASSERT_EQ(forkedHandle2->ActorCursor(), deserializedHandle1.ActorCursor()); + + // Test out-of-band forking logic. + std::unique_ptr forkedHandle3 = parent.ForkForSerialization(); + ASSERT_EQ(2, parent.num_forks_); + ASSERT_EQ(parent.GetActorID(), forkedHandle3->GetActorID()); + ASSERT_EQ(actor_handle_id, parent.GetActorHandleID()); + ASSERT_NE(parent.GetActorHandleID(), forkedHandle3->GetActorHandleID()); + ASSERT_NE(forkedHandle2->GetActorHandleID(), forkedHandle3->GetActorHandleID()); + ASSERT_EQ(parent.ActorLanguage(), forkedHandle3->ActorLanguage()); + ASSERT_EQ(parent.ActorCreationTaskFunctionDescriptor(), + forkedHandle3->ActorCreationTaskFunctionDescriptor()); + ASSERT_EQ(parent.ActorCursor(), forkedHandle3->ActorCursor()); + ASSERT_EQ(0, forkedHandle3->task_counter_); + ASSERT_EQ(0, forkedHandle3->num_forks_); + ASSERT_EQ(parent.new_actor_handles_.size(), 1); + ASSERT_NE(parent.new_actor_handles_.back(), forkedHandle3->GetActorHandleID()); + parent.new_actor_handles_.clear(); + + // Test serialization and deserialization for out-of-band fork. + std::string buffer2; + forkedHandle3->Serialize(&buffer2); + ActorHandle deserializedHandle2(buffer2, task_id); + ASSERT_EQ(forkedHandle3->GetActorID(), deserializedHandle2.GetActorID()); + ASSERT_NE(forkedHandle3->GetActorHandleID(), deserializedHandle2.GetActorHandleID()); + ASSERT_EQ(forkedHandle3->ActorLanguage(), deserializedHandle2.ActorLanguage()); + ASSERT_EQ(forkedHandle3->ActorCreationTaskFunctionDescriptor(), + deserializedHandle2.ActorCreationTaskFunctionDescriptor()); + ASSERT_EQ(forkedHandle3->ActorCursor(), deserializedHandle2.ActorCursor()); } TEST_F(SingleNodeTest, TestMemoryStoreProvider) { diff --git a/src/ray/gcs/redis_gcs_client_test.cc b/src/ray/gcs/redis_gcs_client_test.cc index 2ede4616d..dc26e32ec 100644 --- a/src/ray/gcs/redis_gcs_client_test.cc +++ b/src/ray/gcs/redis_gcs_client_test.cc @@ -95,7 +95,7 @@ std::shared_ptr CreateTaskTableData(const TaskID &task_id, return data; } -/// A helper function that compare wether 2 `TaskTableData` objects are equal. +/// A helper function that compare whether 2 `TaskTableData` objects are equal. /// Note, this function only compares fields set by `CreateTaskTableData`. bool TaskTableDataEqual(const TaskTableData &data1, const TaskTableData &data2) { const auto &spec1 = data1.task().task_spec(); diff --git a/src/ray/protobuf/core_worker.proto b/src/ray/protobuf/core_worker.proto index 77c807fbb..a50d1a1e9 100644 --- a/src/ray/protobuf/core_worker.proto +++ b/src/ray/protobuf/core_worker.proto @@ -4,6 +4,7 @@ package ray.rpc; import "src/ray/protobuf/common.proto"; +// Persistent state of an ActorHandle. message ActorHandle { // ID of the actor. bytes actor_id = 1; @@ -11,23 +12,20 @@ message ActorHandle { // ID of this actor handle. bytes actor_handle_id = 2; + // ID of the job that created the actor (it is possible that the handle + // exists on a job with a different job ID). + bytes creation_job_id = 3; + // Language of the actor. - Language actor_language = 3; + Language actor_language = 4; // Function descriptor of actor creation task. - repeated string actor_creation_task_function_descriptor = 4; + repeated string actor_creation_task_function_descriptor = 5; // The unique id of the last return of the last task. // It's used as a dependency for the next task. - bytes actor_cursor = 5; - - // The number of tasks that have been invoked on this actor. - int64 task_counter = 6; - - // The number of times that this actor handle has been forked. - // It's used to make sure ids of actor handles are unique. - int64 num_forks = 7; + bytes actor_cursor = 6; // Whether direct actor call is used. - bool is_direct_call = 8; + bool is_direct_call = 7; }