diff --git a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java b/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java index 4e17e45a7..cf413a913 100644 --- a/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java +++ b/java/runtime/src/main/java/org/ray/runtime/generated/TaskInfo.java @@ -38,20 +38,22 @@ public final class TaskInfo extends Table { public ByteBuffer actorHandleIdInByteBuffer(ByteBuffer _bb) { return __vector_in_bytebuffer(_bb, 20, 1); } public int actorCounter() { int o = __offset(22); return o != 0 ? bb.getInt(o + bb_pos) : 0; } public boolean isActorCheckpointMethod() { int o = __offset(24); return o != 0 ? 0!=bb.get(o + bb_pos) : false; } + public String newActorHandles(int j) { int o = __offset(26); return o != 0 ? __string(__vector(o) + j * 4) : null; } + public int newActorHandlesLength() { int o = __offset(26); return o != 0 ? __vector_len(o) : 0; } public Arg args(int j) { return args(new Arg(), j); } - public Arg args(Arg obj, int j) { int o = __offset(26); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int argsLength() { int o = __offset(26); return o != 0 ? __vector_len(o) : 0; } - public String returns(int j) { int o = __offset(28); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int returnsLength() { int o = __offset(28); return o != 0 ? __vector_len(o) : 0; } + public Arg args(Arg obj, int j) { int o = __offset(28); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int argsLength() { int o = __offset(28); return o != 0 ? __vector_len(o) : 0; } + public String returns(int j) { int o = __offset(30); return o != 0 ? __string(__vector(o) + j * 4) : null; } + public int returnsLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; } public ResourcePair requiredResources(int j) { return requiredResources(new ResourcePair(), j); } - public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(30); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int requiredResourcesLength() { int o = __offset(30); return o != 0 ? __vector_len(o) : 0; } + public ResourcePair requiredResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int requiredResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; } public ResourcePair requiredPlacementResources(int j) { return requiredPlacementResources(new ResourcePair(), j); } - public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(32); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } - public int requiredPlacementResourcesLength() { int o = __offset(32); return o != 0 ? __vector_len(o) : 0; } - public int language() { int o = __offset(34); return o != 0 ? bb.getInt(o + bb_pos) : 0; } - public String functionDescriptor(int j) { int o = __offset(36); return o != 0 ? __string(__vector(o) + j * 4) : null; } - public int functionDescriptorLength() { int o = __offset(36); return o != 0 ? __vector_len(o) : 0; } + public ResourcePair requiredPlacementResources(ResourcePair obj, int j) { int o = __offset(34); return o != 0 ? obj.__assign(__indirect(__vector(o) + j * 4), bb) : null; } + public int requiredPlacementResourcesLength() { int o = __offset(34); return o != 0 ? __vector_len(o) : 0; } + public int language() { int o = __offset(36); return o != 0 ? bb.getInt(o + bb_pos) : 0; } + public String functionDescriptor(int j) { int o = __offset(38); return o != 0 ? __string(__vector(o) + j * 4) : null; } + public int functionDescriptorLength() { int o = __offset(38); return o != 0 ? __vector_len(o) : 0; } public static int createTaskInfo(FlatBufferBuilder builder, int driver_idOffset, @@ -65,19 +67,21 @@ public final class TaskInfo extends Table { int actor_handle_idOffset, int actor_counter, boolean is_actor_checkpoint_method, + int new_actor_handlesOffset, int argsOffset, int returnsOffset, int required_resourcesOffset, int required_placement_resourcesOffset, int language, int function_descriptorOffset) { - builder.startObject(17); + builder.startObject(18); TaskInfo.addFunctionDescriptor(builder, function_descriptorOffset); TaskInfo.addLanguage(builder, language); TaskInfo.addRequiredPlacementResources(builder, required_placement_resourcesOffset); TaskInfo.addRequiredResources(builder, required_resourcesOffset); TaskInfo.addReturns(builder, returnsOffset); TaskInfo.addArgs(builder, argsOffset); + TaskInfo.addNewActorHandles(builder, new_actor_handlesOffset); TaskInfo.addActorCounter(builder, actor_counter); TaskInfo.addActorHandleId(builder, actor_handle_idOffset); TaskInfo.addActorId(builder, actor_idOffset); @@ -92,7 +96,7 @@ public final class TaskInfo extends Table { return TaskInfo.endTaskInfo(builder); } - public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(17); } + public static void startTaskInfo(FlatBufferBuilder builder) { builder.startObject(18); } public static void addDriverId(FlatBufferBuilder builder, int driverIdOffset) { builder.addOffset(0, driverIdOffset, 0); } public static void addTaskId(FlatBufferBuilder builder, int taskIdOffset) { builder.addOffset(1, taskIdOffset, 0); } public static void addParentTaskId(FlatBufferBuilder builder, int parentTaskIdOffset) { builder.addOffset(2, parentTaskIdOffset, 0); } @@ -104,20 +108,23 @@ public final class TaskInfo extends Table { public static void addActorHandleId(FlatBufferBuilder builder, int actorHandleIdOffset) { builder.addOffset(8, actorHandleIdOffset, 0); } public static void addActorCounter(FlatBufferBuilder builder, int actorCounter) { builder.addInt(9, actorCounter, 0); } public static void addIsActorCheckpointMethod(FlatBufferBuilder builder, boolean isActorCheckpointMethod) { builder.addBoolean(10, isActorCheckpointMethod, false); } - public static void addArgs(FlatBufferBuilder builder, int argsOffset) { builder.addOffset(11, argsOffset, 0); } + public static void addNewActorHandles(FlatBufferBuilder builder, int newActorHandlesOffset) { builder.addOffset(11, newActorHandlesOffset, 0); } + public static int createNewActorHandlesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } + public static void startNewActorHandlesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } + public static void addArgs(FlatBufferBuilder builder, int argsOffset) { builder.addOffset(12, argsOffset, 0); } public static int createArgsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startArgsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addReturns(FlatBufferBuilder builder, int returnsOffset) { builder.addOffset(12, returnsOffset, 0); } + public static void addReturns(FlatBufferBuilder builder, int returnsOffset) { builder.addOffset(13, returnsOffset, 0); } public static int createReturnsVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startReturnsVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(13, requiredResourcesOffset, 0); } + public static void addRequiredResources(FlatBufferBuilder builder, int requiredResourcesOffset) { builder.addOffset(14, requiredResourcesOffset, 0); } public static int createRequiredResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startRequiredResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(14, requiredPlacementResourcesOffset, 0); } + public static void addRequiredPlacementResources(FlatBufferBuilder builder, int requiredPlacementResourcesOffset) { builder.addOffset(15, requiredPlacementResourcesOffset, 0); } public static int createRequiredPlacementResourcesVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startRequiredPlacementResourcesVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } - public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(15, language, 0); } - public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(16, functionDescriptorOffset, 0); } + public static void addLanguage(FlatBufferBuilder builder, int language) { builder.addInt(16, language, 0); } + public static void addFunctionDescriptor(FlatBufferBuilder builder, int functionDescriptorOffset) { builder.addOffset(17, functionDescriptorOffset, 0); } public static int createFunctionDescriptorVector(FlatBufferBuilder builder, int[] data) { builder.startVector(4, data.length, 4); for (int i = data.length - 1; i >= 0; i--) builder.addOffset(data[i]); return builder.endVector(); } public static void startFunctionDescriptorVector(FlatBufferBuilder builder, int numElems) { builder.startVector(4, numElems, 4); } public static int endTaskInfo(FlatBufferBuilder builder) { @@ -132,7 +139,7 @@ public final class TaskInfo extends Table { * TODO(yuhguo): fix this error-prone funciton. */ public ByteBuffer returnsAsByteBuffer(int j) { - int o = __offset(28); + int o = __offset(30); if (o == 0) { return null; } diff --git a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java index f658d3b16..511b24b22 100644 --- a/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java +++ b/java/runtime/src/main/java/org/ray/runtime/raylet/RayletClientImpl.java @@ -191,6 +191,12 @@ public class RayletClientImpl implements RayletClient { final int actorIdOffset = fbb.createString(task.actorId.toByteBuffer()); final int actorHandleIdOffset = fbb.createString(task.actorHandleId.toByteBuffer()); final int actorCounter = task.actorCounter; + // Serialize the new actor handles. + int[] newActorHandlesOffsets = new int[task.newActorHandles.length]; + for (int i = 0; i < newActorHandlesOffsets.length; i++) { + newActorHandlesOffsets[i] = fbb.createString(task.newActorHandles[i].toByteBuffer()); + } + int newActorHandlesOffset = fbb.createVectorOfTables(newActorHandlesOffsets); // Serialize args int[] argsOffsets = new int[task.args.length]; for (int i = 0; i < argsOffsets.length; i++) { @@ -252,6 +258,7 @@ public class RayletClientImpl implements RayletClient { actorHandleIdOffset, actorCounter, false, + newActorHandlesOffset, argsOffset, returnsOffset, requiredResourcesOffset, diff --git a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java index 8988c933f..fd919f625 100644 --- a/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java +++ b/java/runtime/src/main/java/org/ray/runtime/task/TaskSpec.java @@ -40,6 +40,9 @@ public class TaskSpec { // Number of tasks that have been submitted to this actor so far. public final int actorCounter; + // Task arguments. + public final UniqueId[] newActorHandles; + // Task arguments. public final FunctionArg[] args; @@ -76,6 +79,8 @@ public class TaskSpec { this.actorId = actorId; this.actorHandleId = actorHandleId; this.actorCounter = actorCounter; + // TODO: Initialize the new actor handles. + this.newActorHandles = new UniqueId[] {}; this.args = args; this.returnIds = returnIds; this.resources = resources; diff --git a/python/ray/actor.py b/python/ray/actor.py index a29bb765a..54baeddb0 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -485,6 +485,10 @@ class ActorHandle(object): _ray_actor_driver_id: The driver ID of the job that created the actor (it is possible that this ActorHandle exists on a driver with a different driver ID). + _ray_new_actor_handles: The new actor handles that were created from + this handle since the last task on this handle was submitted. This + is used to garbage-collect dummy objects that are no longer + necessary in the backend. """ def __init__(self, @@ -520,6 +524,7 @@ class ActorHandle(object): actor_creation_dummy_object_id) self._ray_actor_method_cpus = actor_method_cpus self._ray_actor_driver_id = actor_driver_id + self._ray_new_actor_handles = [] def _actor_method_call(self, method_name, @@ -585,6 +590,7 @@ class ActorHandle(object): actor_creation_dummy_object_id=( self._ray_actor_creation_dummy_object_id), execution_dependencies=execution_dependencies, + new_actor_handles=self._ray_new_actor_handles, # We add one for the dummy return ID. num_return_vals=num_return_vals + 1, resources={"CPU": self._ray_actor_method_cpus}, @@ -596,6 +602,9 @@ class ActorHandle(object): # The last object returned is the dummy object that should be # passed in to the next actor method. Do not return it to the user. self._ray_actor_cursor = object_ids.pop() + # We have notified the backend of the new actor handles to expect since + # the last task was submitted, so clear the list. + self._ray_new_actor_handles = [] if len(object_ids) == 1: object_ids = object_ids[0] @@ -702,6 +711,19 @@ class ActorHandle(object): if ray_forking: self._ray_actor_forks += 1 + new_actor_handle_id = actor_handle_id + else: + # The execution dependency for a pickled actor handle is never safe + # to release, since it could be unpickled and submit another + # dependent task at any time. Therefore, we notify the backend of a + # random handle ID that will never actually be used. + new_actor_handle_id = ray.ObjectID(_random_string()) + # Notify the backend to expect this new actor handle. The backend will + # not release the cursor for any new handles until the first task for + # each of the new handles is submitted. + # NOTE(swang): There is currently no garbage collection for actor + # handles until the actor itself is removed. + self._ray_new_actor_handles.append(new_actor_handle_id) return state diff --git a/python/ray/experimental/named_actors.py b/python/ray/experimental/named_actors.py index 54deddd1f..a3f2f66bb 100644 --- a/python/ray/experimental/named_actors.py +++ b/python/ray/experimental/named_actors.py @@ -56,5 +56,8 @@ def register_actor(name, actor_handle): # Add the actor to Redis if it does not already exist. already_exists = _internal_kv_put(actor_name, pickled_state) if already_exists: + # If the registration fails, then erase the new actor handle that + # was added when pickling the actor handle. + actor_handle._ray_new_actor_handles.pop() raise ValueError( "Error: the actor with name={} already exists".format(name)) diff --git a/python/ray/worker.py b/python/ray/worker.py index 97ad4e04a..f43f12206 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -524,6 +524,7 @@ class Worker(object): actor_creation_dummy_object_id=None, max_actor_reconstructions=0, execution_dependencies=None, + new_actor_handles=None, num_return_vals=None, resources=None, placement_resources=None, @@ -594,6 +595,9 @@ class Worker(object): if execution_dependencies is None: execution_dependencies = [] + if new_actor_handles is None: + new_actor_handles = [] + if driver_id is None: driver_id = self.task_driver_id @@ -628,8 +632,8 @@ class Worker(object): num_return_vals, self.current_task_id, task_index, actor_creation_id, actor_creation_dummy_object_id, max_actor_reconstructions, actor_id, actor_handle_id, - actor_counter, execution_dependencies, resources, - placement_resources) + actor_counter, new_actor_handles, execution_dependencies, + resources, placement_resources) self.raylet_client.submit_task(task) return task.returns() @@ -1944,7 +1948,7 @@ def connect(ray_params, worker.current_task_id, worker.task_index, ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), 0, ray.ObjectID(NIL_ACTOR_ID), ray.ObjectID(NIL_ACTOR_ID), - nil_actor_counter, [], {"CPU": 0}, {}) + nil_actor_counter, [], [], {"CPU": 0}, {}) # Add the driver task to the task table. global_state._execute_command(driver_task.task_id(), "RAY.TABLE_ADD", diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index df4077a45..74f721b6e 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -73,6 +73,10 @@ table TaskInfo { actor_counter: int; // True if this task is an actor checkpoint task and false otherwise. is_actor_checkpoint_method: bool; + // If this is an actor task, then this will be populated with all of the new + // actor handles that were forked from this handle since the last task on + // this handle was submitted. + new_actor_handles: [string]; // Task arguments. args: [Arg]; // Object IDs of return values. diff --git a/src/ray/raylet/actor_registration.cc b/src/ray/raylet/actor_registration.cc index 5c9b32228..0e95e200a 100644 --- a/src/ray/raylet/actor_registration.cc +++ b/src/ray/raylet/actor_registration.cc @@ -40,13 +40,39 @@ const std::unordered_map return frontier_; } -void ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, - const ObjectID &execution_dependency) { +ObjectID ActorRegistration::ExtendFrontier(const ActorHandleID &handle_id, + const ObjectID &execution_dependency) { auto &frontier_entry = frontier_[handle_id]; + // Release the reference to the previous cursor for this + // actor handle, if there was one. + ObjectID object_to_release; + if (!frontier_entry.execution_dependency.is_nil()) { + auto it = dummy_objects_.find(frontier_entry.execution_dependency); + RAY_CHECK(it != dummy_objects_.end()); + it->second--; + RAY_CHECK(it->second >= 0); + if (it->second == 0) { + object_to_release = frontier_entry.execution_dependency; + dummy_objects_.erase(it); + } + } + frontier_entry.task_counter++; frontier_entry.execution_dependency = execution_dependency; execution_dependency_ = execution_dependency; - dummy_objects_.push_back(execution_dependency); + // Add the reference to the new cursor for this actor handle. + dummy_objects_[execution_dependency]++; + return object_to_release; +} + +void ActorRegistration::AddHandle(const ActorHandleID &handle_id, + const ObjectID &execution_dependency) { + if (frontier_.find(handle_id) == frontier_.end()) { + auto &new_handle = frontier_[handle_id]; + new_handle.task_counter = 0; + new_handle.execution_dependency = execution_dependency; + dummy_objects_[execution_dependency]++; + } } int ActorRegistration::NumHandles() const { return frontier_.size(); } diff --git a/src/ray/raylet/actor_registration.h b/src/ray/raylet/actor_registration.h index 9c4664455..d56090103 100644 --- a/src/ray/raylet/actor_registration.h +++ b/src/ray/raylet/actor_registration.h @@ -89,7 +89,9 @@ class ActorRegistration { const std::unordered_map &GetFrontier() const; /// Get all the dummy objects of this actor's tasks. - const std::vector &GetDummyObjects() const { return dummy_objects_; } + const std::unordered_map &GetDummyObjects() const { + return dummy_objects_; + } /// Extend the frontier of the actor by a single task. This should be called /// whenever the actor executes a task. @@ -97,8 +99,20 @@ class ActorRegistration { /// \param handle_id The ID of the handle that submitted the task. /// \param execution_dependency The object representing the actor's new /// state. This is the execution dependency returned by the task. - void ExtendFrontier(const ActorHandleID &handle_id, - const ObjectID &execution_dependency); + /// \return The dummy object that can be released as a result of the executed + /// task. If no dummy object can be released, then this is nil. + ObjectID ExtendFrontier(const ActorHandleID &handle_id, + const ObjectID &execution_dependency); + + /// Add a new handle to the actor frontier. This does nothing if the actor + /// handle already exists. + /// + /// \param handle_id The ID of the handle to add. + /// \param execution_dependency This is the expected execution dependency for + /// the first task submitted on the new handle. If the new handle hasn't been + /// seen yet, then this dependency will be added to the actor frontier and is + /// not safe to release until the first task has been submitted. + void AddHandle(const ActorHandleID &handle_id, const ObjectID &execution_dependency); /// Returns num handles to this actor entry. /// @@ -117,9 +131,24 @@ class ActorRegistration { /// executed so far and which tasks may execute next, based on execution /// dependencies. This is indexed by handle. std::unordered_map frontier_; - - /// All of the dummy object IDs from this actor's tasks. - std::vector dummy_objects_; + /// This map is used to track all the unreleased dummy objects for this + /// actor. The map key is the dummy object ID, and the map value is the + /// number of actor handles that depend on that dummy object. When the map + /// value decreases to 0, the dummy object is safe to release from the object + /// manager, since this means that no actor handle will depend on that dummy + /// object again. + /// + /// An actor handle depends on a dummy object when its next unfinished task + /// depends on the dummy object. For a given dummy object (say D) created by + /// task (say T) that was submitted by an actor handle (say H), there could + /// be 2 types of such actor handles: + /// 1. T is the last task submitted by H that was executed. If the next task + /// submitted by H hasn't finished yet, then H still depends on D since D + /// will be in the next task's execution dependencies. + /// 2. Any handles that were forked from H after T finished, and before T's + /// next task finishes. Such handles depend on D until their first tasks + /// finish since D will be their first tasks' execution dependencies. + std::unordered_map dummy_objects_; }; } // namespace raylet diff --git a/src/ray/raylet/lib/python/common_extension.cc b/src/ray/raylet/lib/python/common_extension.cc index d986427cd..b8a51787e 100644 --- a/src/ray/raylet/lib/python/common_extension.cc +++ b/src/ray/raylet/lib/python/common_extension.cc @@ -406,6 +406,7 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { // Max number of times to reconstruct this actor (only used for actor creation // task). int32_t max_actor_reconstructions; + PyObject *new_actor_handles; // Arguments of the task that are execution-dependent. These must be // PyObjectIDs). PyObject *execution_arguments = nullptr; @@ -413,17 +414,16 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { PyObject *resource_map = nullptr; // Dictionary of required placement resources for this task. PyObject *placement_resource_map = nullptr; - // Function descriptor. std::vector function_descriptor; if (!PyArg_ParseTuple( - args, "O&O&OiO&i|O&O&iO&O&iOOOi", &PyObjectToUniqueID, &driver_id, + args, "O&O&OiO&i|O&O&iO&O&iOOOOi", &PyObjectToUniqueID, &driver_id, &PyListStringToStringVector, &function_descriptor, &arguments, &num_returns, &PyObjectToUniqueID, &parent_task_id, &parent_counter, &PyObjectToUniqueID, &actor_creation_id, &PyObjectToUniqueID, &actor_creation_dummy_object_id, &max_actor_reconstructions, &PyObjectToUniqueID, &actor_id, &PyObjectToUniqueID, - &actor_handle_id, &actor_counter, &execution_arguments, &resource_map, - &placement_resource_map, &language)) { + &actor_handle_id, &actor_counter, &new_actor_handles, &execution_arguments, + &resource_map, &placement_resource_map, &language)) { return -1; } @@ -450,8 +450,6 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { Py_ssize_t num_args = PyList_Size(arguments); - self->task_spec = nullptr; - // Create the task spec. // Parse the arguments from the list. @@ -471,11 +469,23 @@ static int PyTask_init(PyTask *self, PyObject *args, PyObject *kwds) { } } + std::vector task_new_actor_handles; + Py_ssize_t num_new_actor_handles = PyList_Size(new_actor_handles); + for (Py_ssize_t i = 0; i < num_new_actor_handles; ++i) { + PyObject *new_actor_handle = PyList_GetItem(new_actor_handles, i); + if (!PyObject_IsInstance(new_actor_handle, (PyObject *)&PyObjectIDType)) { + PyErr_SetString(PyExc_TypeError, "New actor handles must be a ray.ObjectID."); + return -1; + } + task_new_actor_handles.push_back(((PyObjectID *)new_actor_handle)->object_id); + } + self->task_spec = new ray::raylet::TaskSpecification( driver_id, parent_task_id, parent_counter, actor_creation_id, actor_creation_dummy_object_id, max_actor_reconstructions, actor_id, - actor_handle_id, actor_counter, task_args, num_returns, required_resources, - required_placement_resources, Language::PYTHON, function_descriptor); + actor_handle_id, actor_counter, task_new_actor_handles, task_args, num_returns, + required_resources, required_placement_resources, Language::PYTHON, + function_descriptor); /* Set the task's execution dependencies. */ self->execution_dependencies = new std::vector(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 01bcedd99..f2f3ecb41 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -11,31 +11,24 @@ namespace { #define RAY_CHECK_ENUM(x, y) \ static_assert(static_cast(x) == static_cast(y), "protocol mismatch") -/// A helper function to determine whether a given actor task has already been executed -/// according to the given actor registry. Returns true if the task is a duplicate. -bool CheckDuplicateActorTask( +/// A helper function to return the expected actor counter for a given actor +/// and actor handle, according to the given actor registry. If a task's +/// counter is less than the returned value, then the task is a duplicate. If +/// the task's counter is equal to the returned value, then the task should be +/// the next to run. +int64_t GetExpectedTaskCounter( const std::unordered_map &actor_registry, - const ray::raylet::TaskSpecification &spec) { - auto actor_entry = actor_registry.find(spec.ActorId()); + const ray::ActorID &actor_id, const ray::ActorHandleID &actor_handle_id) { + auto actor_entry = actor_registry.find(actor_id); RAY_CHECK(actor_entry != actor_registry.end()); const auto &frontier = actor_entry->second.GetFrontier(); int64_t expected_task_counter = 0; - auto frontier_entry = frontier.find(spec.ActorHandleId()); + auto frontier_entry = frontier.find(actor_handle_id); if (frontier_entry != frontier.end()) { expected_task_counter = frontier_entry->second.task_counter; } - if (spec.ActorCounter() < expected_task_counter) { - // The assigned task counter is less than expected. The actor has already - // executed past this task, so do not assign the task again. - RAY_LOG(WARNING) << "A task was resubmitted, so we are ignoring it. This " - << "should only happen during reconstruction."; - return true; - } - RAY_CHECK(spec.ActorCounter() == expected_task_counter) - << "Expected actor counter: " << expected_task_counter - << ", got: " << spec.ActorCounter(); - return false; + return expected_task_counter; }; } // namespace @@ -748,8 +741,8 @@ void NodeManager::HandleDisconnectedActor(const ActorID &actor_id, bool was_loca if (was_local) { // Clean up the dummy objects from this actor. RAY_LOG(DEBUG) << "Removing dummy objects for actor: " << actor_id; - for (auto &id : actor_entry->second.GetDummyObjects()) { - HandleObjectMissing(id); + for (auto &dummy_object_pair : actor_entry->second.GetDummyObjects()) { + HandleObjectMissing(dummy_object_pair.first); } } // Update the actor's state. @@ -1250,9 +1243,25 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag // If this actor is alive, check whether this actor is local. auto node_manager_id = actor_entry->second.GetNodeManagerId(); if (node_manager_id == gcs_client_->client_table().GetLocalClientId()) { - // If this actor is local, queue the task for local execution, bypassing - // placement. - EnqueuePlaceableTask(task); + // The actor is local. + int64_t expected_task_counter = GetExpectedTaskCounter( + actor_registry_, spec.ActorId(), spec.ActorHandleId()); + if (spec.ActorCounter() < expected_task_counter) { + // A task that has already been executed before has been found. The + // task will be treated as failed if at least one of the task's + // return values have been evicted, to prevent the application from + // hanging. + // TODO(swang): Clean up the task from the lineage cache? If the + // task is not marked as failed, then it may never get marked as + // ready to flush to the GCS. + RAY_LOG(WARNING) << "A task was resubmitted, so we are ignoring it. This " + << "should only happen during reconstruction."; + TreatTaskAsFailedIfLost(task); + } else { + // The task has not yet been executed. Queue the task for local + // execution, bypassing placement. + EnqueuePlaceableTask(task); + } } else { // The actor is remote. Forward the task to the node manager that owns // the actor. @@ -1454,14 +1463,13 @@ bool NodeManager::AssignTask(const Task &task) { // If this is an actor task, check that the new task has the correct counter. if (spec.IsActorTask()) { - if (CheckDuplicateActorTask(actor_registry_, spec)) { - // The actor is alive, and a task that has already been executed before - // has been found. The task will be treated as failed if at least one of - // the task's return values have been evicted, to prevent the application - // from hanging. - TreatTaskAsFailedIfLost(task); - return true; - } + // An actor task should only be ready to be assigned if it matches the + // expected task counter. + int64_t expected_task_counter = + GetExpectedTaskCounter(actor_registry_, spec.ActorId(), spec.ActorHandleId()); + RAY_CHECK(spec.ActorCounter() == expected_task_counter) + << "Expected actor counter: " << expected_task_counter << ", task " + << spec.TaskId() << " has: " << spec.ActorCounter(); } // Try to get an idle worker that can execute this task. @@ -1519,11 +1527,34 @@ bool NodeManager::AssignTask(const Task &task) { // We successfully assigned the task to the worker. worker->AssignTaskId(spec.TaskId()); worker->AssignDriverId(spec.DriverId()); - // If the task was an actor task, then record this execution to guarantee - // consistency in the case of reconstruction. + // Actor tasks require extra accounting to track the actor's state. if (spec.IsActorTask()) { auto actor_entry = actor_registry_.find(spec.ActorId()); RAY_CHECK(actor_entry != actor_registry_.end()); + // Process any new actor handles that were created since the + // previous task on this handle was executed. The first task + // submitted on a new actor handle will depend on the dummy object + // returned by the previous task, so the dependency will not be + // released until this first task is submitted. + for (auto &new_handle_id : spec.NewActorHandles()) { + // Get the execution dependency for the first task submitted on the new + // actor handle. Since the new actor handle was created after this task + // began and before this task finished, it must have the same execution + // dependency. + const auto &execution_dependencies = + assigned_task.GetTaskExecutionSpec().ExecutionDependencies(); + // TODO(swang): We expect this task to have exactly 1 execution dependency, + // the dummy object returned by the previous actor task. However, this + // leaks information about the TaskExecutionSpecification implementation. + RAY_CHECK(execution_dependencies.size() == 1); + const ObjectID &execution_dependency = execution_dependencies.front(); + // Add the new handle and give it a reference to the finished task's + // execution dependency. + actor_entry->second.AddHandle(new_handle_id, execution_dependency); + } + + // If the task was an actor task, then record this execution to + // guarantee consistency in the case of reconstruction. auto execution_dependency = actor_entry->second.GetExecutionDependency(); // The execution dependency is initialized to the actor creation task's // return value, and is subsequently updated to the assigned tasks' @@ -1539,7 +1570,10 @@ bool NodeManager::AssignTask(const Task &task) { // (SetExecutionDependencies takes a non-const so copy task in a // on-const variable.) assigned_task.SetExecutionDependencies({execution_dependency}); + } else { + RAY_CHECK(spec.NewActorHandles().empty()); } + // We started running the task, so the task is ready to write to GCS. if (!lineage_cache_.AddReadyTask(assigned_task)) { RAY_LOG(WARNING) << "Task " << spec.TaskId() << " already in lineage cache." @@ -1577,8 +1611,36 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // (See design_docs/task_states.rst for the state transition diagram.) const auto task = local_queues_.RemoveTask(task_id); + // Release task's resources. The worker's lifetime resources are still held. + auto const &task_resources = worker.GetTaskResourceIds(); + local_available_resources_.Release(task_resources); + RAY_CHECK(cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( + task_resources.ToResourceSet())); + worker.ResetTaskResourceIds(); + + // If this was an actor or actor creation task, handle the actor's new state. + if (task.GetTaskSpecification().IsActorCreationTask() || + task.GetTaskSpecification().IsActorTask()) { + FinishAssignedActorTask(worker, task); + } + + // Notify the task dependency manager that this task has finished execution. + task_dependency_manager_.TaskCanceled(task_id); + + // Unset the worker's assigned task. + worker.AssignTaskId(TaskID::nil()); + // Unset the worker's assigned driver Id if this is not an actor. + if (!task.GetTaskSpecification().IsActorCreationTask() && + !task.GetTaskSpecification().IsActorTask()) { + worker.AssignDriverId(DriverID::nil()); + } +} + +void NodeManager::FinishAssignedActorTask(Worker &worker, const Task &task) { + // If this was an actor creation task, then convert the worker to an actor + // and notify the other node managers. if (task.GetTaskSpecification().IsActorCreationTask()) { - // If this was an actor creation task, then convert the worker to an actor. + // Convert the worker to an actor. auto actor_id = task.GetTaskSpecification().ActorCreationId(); worker.AssignActorId(actor_id); // Publish the actor creation event to all other nodes so that methods for @@ -1622,54 +1684,38 @@ void NodeManager::FinishAssignedTask(Worker &worker) { // Only one node at a time should succeed at creating the actor. RAY_LOG(FATAL) << "Failed to update state to ALIVE for actor " << id; }); + } + + // Update the actor's frontier. + ActorID actor_id; + ActorHandleID actor_handle_id; + if (task.GetTaskSpecification().IsActorCreationTask()) { + actor_id = task.GetTaskSpecification().ActorCreationId(); + actor_handle_id = ActorHandleID::nil(); } else { - // Release task's resources. - local_available_resources_.Release(worker.GetTaskResourceIds()); - worker.ResetTaskResourceIds(); - - RAY_CHECK( - cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( - task.GetTaskSpecification().GetRequiredResources())); + actor_id = task.GetTaskSpecification().ActorId(); + actor_handle_id = task.GetTaskSpecification().ActorHandleId(); } - - // If the finished task was an actor task, mark the returned dummy object as - // locally available. This is not added to the object table, so the update - // will be invisible to both the local object manager and the other nodes. - if (task.GetTaskSpecification().IsActorCreationTask() || - task.GetTaskSpecification().IsActorTask()) { - ActorID actor_id; - ActorHandleID actor_handle_id; - if (task.GetTaskSpecification().IsActorCreationTask()) { - actor_id = task.GetTaskSpecification().ActorCreationId(); - actor_handle_id = ActorHandleID::nil(); - } else { - actor_id = task.GetTaskSpecification().ActorId(); - actor_handle_id = task.GetTaskSpecification().ActorHandleId(); - } - auto actor_entry = actor_registry_.find(actor_id); - RAY_CHECK(actor_entry != actor_registry_.end()); - auto dummy_object = task.GetTaskSpecification().ActorDummyObject(); - // Extend the actor's frontier to include the executed task. - actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); - // Mark the dummy object as locally available to indicate that the actor's - // state has changed and the next method can run. - // NOTE(swang): The dummy objects must be marked as local whenever - // ExtendFrontier is called, and vice versa, so that we can clean up the - // dummy objects properly in case the actor fails and needs to be - // reconstructed. - HandleObjectLocal(dummy_object); - } - - // Notify the task dependency manager that this task has finished execution. - task_dependency_manager_.TaskCanceled(task_id); - - // Unset the worker's assigned task. - worker.AssignTaskId(TaskID::nil()); - // Unset the worker's assigned driver Id if this is not an actor. - if (!task.GetTaskSpecification().IsActorCreationTask() && - !task.GetTaskSpecification().IsActorTask()) { - worker.AssignDriverId(DriverID::nil()); + auto actor_entry = actor_registry_.find(actor_id); + RAY_CHECK(actor_entry != actor_registry_.end()); + // Extend the actor's frontier to include the executed task. + const auto dummy_object = task.GetTaskSpecification().ActorDummyObject(); + const ObjectID object_to_release = + actor_entry->second.ExtendFrontier(actor_handle_id, dummy_object); + if (!object_to_release.is_nil()) { + // If there were no new actor handles created, then no other actor task + // will depend on this execution dependency, so it safe to release. + HandleObjectMissing(object_to_release); } + // Mark the dummy object as locally available to indicate that the actor's + // state has changed and the next method can run. This is not added to the + // object table, so the update will be invisible to both the local object + // manager and the other nodes. + // NOTE(swang): The dummy objects must be marked as local whenever + // ExtendFrontier is called, and vice versa, so that we can clean up the + // dummy objects properly in case the actor fails and needs to be + // reconstructed. + HandleObjectLocal(dummy_object); } void NodeManager::HandleTaskReconstruction(const TaskID &task_id) { diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index d8502e7d8..e60dc4e2a 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -181,9 +181,14 @@ class NodeManager { bool AssignTask(const Task &task); /// Handle a worker finishing its assigned task. /// - /// \param The worker that fiished the task. + /// \param worker The worker that finished the task. /// \return Void. void FinishAssignedTask(Worker &worker); + /// Handle a worker finishing an assigned actor task or actor creation task. + /// \param worker The worker that finished the task. + /// \param task The actor task or actor creationt ask. + /// \return Void. + void FinishAssignedActorTask(Worker &worker, const Task &task); /// Make a placement decision for placeable tasks given the resource_map /// provided. This will perform task state transitions and task forwarding. /// diff --git a/src/ray/raylet/task_spec.cc b/src/ray/raylet/task_spec.cc index f4b0b32fc..60d6d04e9 100644 --- a/src/ray/raylet/task_spec.cc +++ b/src/ray/raylet/task_spec.cc @@ -62,7 +62,7 @@ TaskSpecification::TaskSpecification( const std::unordered_map &required_resources, const Language &language, const std::vector &function_descriptor) : TaskSpecification(driver_id, parent_task_id, parent_counter, ActorID::nil(), - ObjectID::nil(), 0, ActorID::nil(), ActorHandleID::nil(), -1, + ObjectID::nil(), 0, ActorID::nil(), ActorHandleID::nil(), -1, {}, task_arguments, num_returns, required_resources, std::unordered_map(), language, function_descriptor) {} @@ -72,6 +72,7 @@ TaskSpecification::TaskSpecification( const ActorID &actor_creation_id, const ObjectID &actor_creation_dummy_object_id, const int64_t max_actor_reconstructions, const ActorID &actor_id, const ActorHandleID &actor_handle_id, int64_t actor_counter, + const std::vector &new_actor_handles, const std::vector> &task_arguments, int64_t num_returns, const std::unordered_map &required_resources, const std::unordered_map &required_placement_resources, @@ -100,8 +101,8 @@ TaskSpecification::TaskSpecification( to_flatbuf(fbb, parent_task_id), parent_counter, to_flatbuf(fbb, actor_creation_id), to_flatbuf(fbb, actor_creation_dummy_object_id), max_actor_reconstructions, to_flatbuf(fbb, actor_id), to_flatbuf(fbb, actor_handle_id), actor_counter, false, - fbb.CreateVector(arguments), fbb.CreateVector(returns), - map_to_flatbuf(fbb, required_resources), + to_flatbuf(fbb, new_actor_handles), fbb.CreateVector(arguments), + fbb.CreateVector(returns), map_to_flatbuf(fbb, required_resources), map_to_flatbuf(fbb, required_placement_resources), language, string_vec_to_flatbuf(fbb, function_descriptor)); fbb.Finish(spec); @@ -263,6 +264,11 @@ ObjectID TaskSpecification::ActorDummyObject() const { return ReturnId(NumReturns() - 1); } +std::vector TaskSpecification::NewActorHandles() const { + auto message = flatbuffers::GetRoot(spec_.data()); + return from_flatbuf(*message->new_actor_handles()); +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/task_spec.h b/src/ray/raylet/task_spec.h index e517af71c..3c33d56aa 100644 --- a/src/ray/raylet/task_spec.h +++ b/src/ray/raylet/task_spec.h @@ -133,6 +133,7 @@ class TaskSpecification { const ActorID &actor_creation_id, const ObjectID &actor_creation_dummy_object_id, int64_t max_actor_reconstructions, const ActorID &actor_id, const ActorHandleID &actor_handle_id, int64_t actor_counter, + const std::vector &new_actor_handles, const std::vector> &task_arguments, int64_t num_returns, const std::unordered_map &required_resources, @@ -200,6 +201,7 @@ class TaskSpecification { ActorHandleID ActorHandleId() const; int64_t ActorCounter() const; ObjectID ActorDummyObject() const; + std::vector NewActorHandles() const; private: /// Assign the specification data from a pointer. diff --git a/src/ray/raylet/worker_pool_test.cc b/src/ray/raylet/worker_pool_test.cc index d235d0244..4faa3e99b 100644 --- a/src/ray/raylet/worker_pool_test.cc +++ b/src/ray/raylet/worker_pool_test.cc @@ -66,9 +66,9 @@ static inline TaskSpecification ExampleTaskSpec( const ActorID actor_id = ActorID::nil(), const Language &language = Language::PYTHON) { std::vector function_descriptor(3); - return TaskSpecification(UniqueID::nil(), UniqueID::nil(), 0, ActorID::nil(), - ObjectID::nil(), 0, actor_id, ActorHandleID::nil(), 0, {}, 0, - {{}}, {{}}, language, function_descriptor); + return TaskSpecification(UniqueID::nil(), TaskID::nil(), 0, ActorID::nil(), + ObjectID::nil(), 0, actor_id, ActorHandleID::nil(), 0, {}, {}, + 0, {{}}, {{}}, language, function_descriptor); } TEST_F(WorkerPoolTest, HandleWorkerRegistration) { diff --git a/test/actor_test.py b/test/actor_test.py index d4b386ed0..c9bc9c3af 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1854,8 +1854,87 @@ def test_fork_consistency(setup_queue_actor): # Fork num_iters times. num_forks = 10 num_items_per_fork = 100 - ray.get( - [fork.remote(queue, i, num_items_per_fork) for i in range(num_forks)]) + + # Submit some tasks on new actor handles. + forks = [ + fork.remote(queue, i, num_items_per_fork) for i in range(num_forks) + ] + # Submit some more tasks on the original actor handle. + for item in range(num_items_per_fork): + local_fork = queue.enqueue.remote(num_forks, item) + forks.append(local_fork) + # Wait for tasks from all handles to complete. + ray.get(forks) + # Check that all tasks from all handles have completed. + items = ray.get(queue.read.remote()) + for i in range(num_forks + 1): + filtered_items = [item[1] for item in items if item[0] == i] + assert filtered_items == list(range(num_items_per_fork)) + + +def test_pickled_handle_consistency(setup_queue_actor): + queue = setup_queue_actor + + @ray.remote + def fork(pickled_queue, key, num_items): + queue = ray.worker.pickle.loads(pickled_queue) + x = None + for item in range(num_items): + x = queue.enqueue.remote(key, item) + return ray.get(x) + + # Fork num_iters times. + num_forks = 10 + num_items_per_fork = 100 + + # Submit some tasks on the pickled actor handle. + new_queue = ray.worker.pickle.dumps(queue) + forks = [ + fork.remote(new_queue, i, num_items_per_fork) for i in range(num_forks) + ] + # Submit some more tasks on the original actor handle. + for item in range(num_items_per_fork): + local_fork = queue.enqueue.remote(num_forks, item) + forks.append(local_fork) + # Wait for tasks from all handles to complete. + ray.get(forks) + # Check that all tasks from all handles have completed. + items = ray.get(queue.read.remote()) + for i in range(num_forks + 1): + filtered_items = [item[1] for item in items if item[0] == i] + assert filtered_items == list(range(num_items_per_fork)) + + +def test_nested_fork(setup_queue_actor): + queue = setup_queue_actor + + @ray.remote + def fork(queue, key, num_items): + x = None + for item in range(num_items): + x = queue.enqueue.remote(key, item) + return ray.get(x) + + @ray.remote + def nested_fork(queue, key, num_items): + # Pass the actor into a nested task. + ray.get(fork.remote(queue, key + 1, num_items)) + x = None + for item in range(num_items): + x = queue.enqueue.remote(key, item) + return ray.get(x) + + # Fork num_iters times. + num_forks = 10 + num_items_per_fork = 100 + + # Submit some tasks on new actor handles. + forks = [ + nested_fork.remote(queue, i, num_items_per_fork) + for i in range(0, num_forks, 2) + ] + ray.get(forks) + # Check that all tasks from all handles have completed. items = ray.get(queue.read.remote()) for i in range(num_forks): filtered_items = [item[1] for item in items if item[0] == i]