diff --git a/python/ray/actor.py b/python/ray/actor.py index eba1dd48c..74d4f3853 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -155,6 +155,9 @@ def make_actor_method_executor(worker, method_name, method): if not actor_checkpoint_failed: put_dummy_object(worker, dummy_return_id) worker.actor_task_counter = task_counter + 1 + # Once the actor has resumed from a checkpoint, it counts as + # loaded. + worker.actor_loaded = True # Report to the local scheduler whether this task succeeded in # loading the checkpoint. worker.actor_checkpoint_failed = actor_checkpoint_failed @@ -168,6 +171,8 @@ def make_actor_method_executor(worker, method_name, method): # case the method throws an exception. put_dummy_object(worker, dummy_return_id) worker.actor_task_counter = task_counter + 1 + # Once the actor executes a task, it counts as loaded. + worker.actor_loaded = True # Execute the actor method. return method(actor, *args) return actor_method_executor @@ -408,9 +413,9 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): error_to_return = None # Save or resume the checkpoint. - if previous_object_id in worker.actor_pinned_objects: - # The preceding task executed on this actor instance. Save the - # checkpoint. + if worker.actor_loaded: + # The actor has loaded, so we are running the normal execution. + # Save the checkpoint. print("Saving actor checkpoint. actor_counter = {}." .format(task_counter)) actor_key = b"Actor:" + worker.actor_id @@ -437,8 +442,8 @@ def make_actor(cls, num_cpus, num_gpus, checkpoint_interval): # so we still consider the task successful. error_to_return = error else: - # The preceding task has not yet executed on this actor - # instance. Try to resume from the most recent checkpoint. + # The actor has not yet loaded. Try loading it from the most + # recent checkpoint. checkpoint_index, checkpoint = get_actor_checkpoint( worker, worker.actor_id) if checkpoint_index == task_counter: diff --git a/python/ray/worker.py b/python/ray/worker.py index d18df9bea..6139e3c54 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -227,6 +227,10 @@ class Worker(object): self.make_actor = None self.actors = {} self.actor_task_counter = 0 + # Whether an actor instance has been loaded yet. The actor counts as + # loaded once it has either executed its first task or successfully + # resumed from a checkpoint. + self.actor_loaded = False # This field is used to report actor checkpoint failure for the last # task assigned. Workers are not assigned a task on startup, so we # initialize to False. diff --git a/src/common/task.cc b/src/common/task.cc index a78c16ac3..7d90a72e0 100644 --- a/src/common/task.cc +++ b/src/common/task.cc @@ -214,6 +214,10 @@ ActorID TaskSpec_actor_id(TaskSpec *spec) { return from_flatbuf(message->actor_id()); } +bool TaskSpec_is_actor_task(TaskSpec *spec) { + return !ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID); +} + int64_t TaskSpec_actor_counter(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); @@ -227,6 +231,19 @@ bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec) { return actor_counter < 0; } +bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index) { + if (TaskSpec_actor_counter(spec) == 0) { + /* The first task does not have any dependencies. */ + return false; + } else if (TaskSpec_actor_is_checkpoint_method(spec)) { + /* Checkpoint tasks do not have any dependencies. */ + return false; + } else { + /* For all other tasks, the last argument is the dummy object. */ + return arg_index == (TaskSpec_num_args(spec) - 1); + } +} + UniqueID TaskSpec_driver_id(TaskSpec *spec) { CHECK(spec); auto message = flatbuffers::GetRoot(spec); diff --git a/src/common/task.h b/src/common/task.h index ce2590a1e..ee06de2ed 100644 --- a/src/common/task.h +++ b/src/common/task.h @@ -126,6 +126,14 @@ FunctionID TaskSpec_function(TaskSpec *spec); */ UniqueID TaskSpec_actor_id(TaskSpec *spec); +/** + * Return whether this task is for an actor. + * + * @param spec The task_spec in question. + * @return Whether the task is for an actor. + */ +bool TaskSpec_is_actor_task(TaskSpec *spec); + /** * Return the actor counter of the task. This starts at 0 and increments by 1 * every time a new task is submitted to run on the actor. @@ -135,8 +143,24 @@ UniqueID TaskSpec_actor_id(TaskSpec *spec); */ int64_t TaskSpec_actor_counter(TaskSpec *spec); +/** + * Return whether the task is a checkpoint method execution. + * + * @param spec The task_spec in question. + * @return Whether the task is a checkpoint method. + */ bool TaskSpec_actor_is_checkpoint_method(TaskSpec *spec); +/** + * Return whether the task's argument is a dummy object. Dummy objects are used + * to encode an actor's state dependencies in the task graph. + * + * @param spec The task_spec in question. + * @param arg_index The index of the argument in question. + * @return Whether the argument at arg_index is a dummy object. + */ +bool TaskSpec_arg_is_actor_dummy_object(TaskSpec *spec, int64_t arg_index); + /** * Return the driver ID of the task. * diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 66343f134..57965ba8d 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -42,6 +42,10 @@ struct ObjectEntry { * to the corresponding task's queue entry in waiting queue, for fast * deletion when all of the task's dependencies become available. */ std::vector::iterator> dependent_tasks; + /** Whether or not to request a transfer of this object. This should be set + * to true for all objects except for actor dummy objects, where the object + * must be generated by executing the task locally. */ + bool request_transfer; }; /** This struct contains information about a specific actor. This struct will be @@ -57,6 +61,12 @@ typedef struct { * currently assigned. If the actor process reports back success for the * assigned task execution, task_counter should be set to this value. */ int64_t assigned_task_counter; + /** Whether the actor process has loaded yet. The actor counts as loaded once + * it has either executed its first task or successfully resumed from a + * checkpoint. Before the actor has loaded, we may dispatch the first task + * or any checkpoint tasks. After it has loaded, we may only dispatch tasks + * in order. */ + bool loaded; /** A queue of tasks to be executed on this actor. The tasks will be sorted by * the order of their actor counters. */ std::list *task_queue; @@ -242,6 +252,7 @@ void create_actor(SchedulingAlgorithmState *algorithm_state, entry.task_queue = new std::list(); entry.worker = worker; entry.worker_available = false; + entry.loaded = false; CHECK(algorithm_state->local_actor_infos.count(actor_id) == 0) algorithm_state->local_actor_infos[actor_id] = entry; @@ -319,31 +330,22 @@ bool dispatch_actor_task(LocalSchedulerState *state, return false; } - /* Find the first task that either matches the task counter or that is a - * checkpoint method. Remove any tasks that we have already executed past - * (e.g., by executing a more recent checkpoint method). */ + /* Check whether we can execute the first task in the queue. */ auto task = entry.task_queue->begin(); int64_t next_task_counter = TaskSpec_actor_counter(task->spec); - while (next_task_counter != entry.task_counter) { - if (next_task_counter < entry.task_counter) { - /* A task that we have already executed past. Remove it. */ - task = entry.task_queue->erase(task); - /* If there are no more tasks in the queue, wait. */ - if (task == entry.task_queue->end()) { - algorithm_state->actors_with_pending_tasks.erase(actor_id); - return false; - } - /* Move on to the next task. */ - next_task_counter = TaskSpec_actor_counter(task->spec); - } else if (TaskSpec_actor_is_checkpoint_method(task->spec)) { - /* A later task that is a checkpoint method. Checkpoint methods can - * always be executed. */ - break; - } else { - /* A later task that is not a checkpoint. Wait for the preceding tasks to - * execute. */ + if (entry.loaded) { + /* Once the actor has loaded, we can only execute tasks in order of + * task_counter. */ + if (next_task_counter != entry.task_counter) { return false; } + } else { + /* If the actor has not yet loaded, we can only execute the task that + * matches task_counter (the first task), or a checkpoint task. */ + if (next_task_counter != entry.task_counter) { + /* No other task should be first in the queue. */ + CHECK(TaskSpec_actor_is_checkpoint_method(task->spec)); + } } /* If there are not enough resources available, we cannot assign the task. */ @@ -390,32 +392,21 @@ void handle_actor_worker_connect(LocalSchedulerState *state, } /** - * This will add a task to the task queue for an actor. If this is the first - * task being processed for this actor, it is possible that the LocalActorInfo - * struct has not yet been created by create_worker (which happens when the - * actor worker connects to the local scheduler), so in that case this method - * will call create_actor. - * - * This method will also update the task table. TODO(rkn): Should we also update - * the task table in the case where the tasks are cached locally? + * Insert a task queue entry into an actor's dispatch queue. The task is + * inserted in sorted order by task counter. If this is the first task + * scheduled to this actor and the worker process has not yet connected, then + * this also creates a LocalActorInfo entry for the actor. * * @param state The state of the local scheduler. * @param algorithm_state The state of the scheduling algorithm. - * @param spec The task spec to add. - * @param from_global_scheduler True if the task was assigned to this local - * scheduler by the global scheduler and false if it was submitted - * locally by a worker. + * @param task_entry The task queue entry to add to the actor's queue. * @return Void. */ -void add_task_to_actor_queue(LocalSchedulerState *state, +void insert_actor_task_queue(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - TaskSpec *spec, - int64_t task_spec_size, - bool from_global_scheduler) { - ActorID actor_id = TaskSpec_actor_id(spec); - char tmp[ID_STRING_SIZE]; - ObjectID_to_string(actor_id, tmp, ID_STRING_SIZE); - DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + TaskQueueEntry task_entry) { + /* Get the local actor entry for this actor. */ + ActorID actor_id = TaskSpec_actor_id(task_entry.spec); /* Handle the case in which there is no LocalActorInfo struct yet. */ if (algorithm_state->local_actor_infos.count(actor_id) == 0) { @@ -425,12 +416,10 @@ void add_task_to_actor_queue(LocalSchedulerState *state, create_actor(algorithm_state, actor_id, NULL); CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1); } - - /* Get the local actor entry for this actor. */ LocalActorInfo &entry = algorithm_state->local_actor_infos.find(actor_id)->second; - int64_t task_counter = TaskSpec_actor_counter(spec); + int64_t task_counter = TaskSpec_actor_counter(task_entry.spec); /* As a sanity check, the counter of the new task should be greater than the * number of tasks that have executed on this actor so far (since we are * guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This @@ -443,8 +432,6 @@ void add_task_to_actor_queue(LocalSchedulerState *state, return; } - /* Create a new task queue entry. */ - TaskQueueEntry elt = TaskQueueEntry_init(spec, task_spec_size); /* Add the task spec to the actor's task queue in a manner that preserves the * order of the actor task counters. Iterate from the beginning of the queue * to find the right place to insert the task queue entry. TODO(pcm): This @@ -465,7 +452,36 @@ void add_task_to_actor_queue(LocalSchedulerState *state, /* The task has a counter that has not been executed or submitted before. Add * it to the actor queue. */ - entry.task_queue->insert(it, elt); + entry.task_queue->insert(it, task_entry); + + /* Record the fact that this actor has a task waiting to execute. */ + algorithm_state->actors_with_pending_tasks.insert(actor_id); +} + +/** + * Queue a task to be dispatched for an actor. Update the task table for the + * queued task. TODO(rkn): Should we also update the task table in the case + * where the tasks are cached locally? + * + * @param state The state of the local scheduler. + * @param algorithm_state The state of the scheduling algorithm. + * @param spec The task spec to add. + * @param from_global_scheduler True if the task was assigned to this local + * scheduler by the global scheduler and false if it was submitted + * locally by a worker. + * @return Void. + */ +void queue_actor_task(LocalSchedulerState *state, + SchedulingAlgorithmState *algorithm_state, + TaskSpec *spec, + int64_t task_spec_size, + bool from_global_scheduler) { + ActorID actor_id = TaskSpec_actor_id(spec); + DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); + + /* Create a new task queue entry. */ + TaskQueueEntry elt = TaskQueueEntry_init(spec, task_spec_size); + insert_actor_task_queue(state, algorithm_state, elt); /* Update the task table. */ if (state->db != NULL) { @@ -483,27 +499,6 @@ void add_task_to_actor_queue(LocalSchedulerState *state, } } - /* Record the fact that this actor has a task waiting to execute. */ - algorithm_state->actors_with_pending_tasks.insert(actor_id); - - /* Register a missing dependency on the preceding task. TODO(swang): Unify - * with `fetch_missing_dependencies` for non-actor tasks. */ - if (entry.task_counter != task_counter) { - int64_t num_args = TaskSpec_num_args(spec); - /* The last argument represents dependency on a preceding task. If it is by - * reference, then it is an explicit dependency. */ - if (TaskSpec_arg_by_ref(spec, num_args - 1)) { - ObjectID dummy_object_id = TaskSpec_arg_id(spec, num_args - 1); - if (algorithm_state->local_objects.count(dummy_object_id) == 0) { - ObjectEntry entry; - /* TODO(swang): Objects in `remote_objects` will get fetched from - * remote plasma managers. Do not fetch actor dummy objects. Otherwise, - * if the plasma manager associated with the dead local scheduler is - * still alive, reconstruction will never complete. */ - state->algorithm_state->remote_objects[dummy_object_id] = entry; - } - } - } } /** @@ -515,12 +510,14 @@ void add_task_to_actor_queue(LocalSchedulerState *state, * @param algorithm_state The scheduling algorithm state. * @param task_entry_it A reference to the task entry in the waiting queue. * @param obj_id The ID of the object that the task is dependent on. + * @param arg_index The object's index in the dependent task's arguments. * @returns Void. */ void fetch_missing_dependency(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, std::list::iterator task_entry_it, - plasma::ObjectID obj_id) { + plasma::ObjectID obj_id, + int64_t arg_index) { if (algorithm_state->remote_objects.count(obj_id) == 0) { /* We weren't actively fetching this object. Try the fetch once * immediately. */ @@ -540,6 +537,15 @@ void fetch_missing_dependency(LocalSchedulerState *state, * the object becomes available locally. It will get freed if the object is * subsequently removed locally. */ ObjectEntry entry; + /* If the task is for an actor, and the missing object is a dummy object, + * then we must generate it locally by executing the corresponding task. + * All other objects may be requested from another plasma manager. */ + if (TaskSpec_is_actor_task(task_entry_it->spec) && + TaskSpec_arg_is_actor_dummy_object(task_entry_it->spec, arg_index)) { + entry.request_transfer = false; + } else { + entry.request_transfer = true; + } algorithm_state->remote_objects[obj_id] = entry; } algorithm_state->remote_objects[obj_id].dependent_tasks.push_back( @@ -550,9 +556,6 @@ void fetch_missing_dependency(LocalSchedulerState *state, * Fetch a queued task's missing object dependencies. The fetch requests will * be retried every kLocalSchedulerFetchTimeoutMilliseconds until all * objects are available locally. - * TODO(swang): For actor task dummy objects, we should still request - * reconstruction for missing dependencies, but we should not request transfer - * from other nodes. * * @param state The scheduler state. * @param algorithm_state The scheduling algorithm state. @@ -566,13 +569,13 @@ void fetch_missing_dependencies( TaskSpec *task = task_entry_it->spec; int64_t num_args = TaskSpec_num_args(task); int num_missing_dependencies = 0; - for (int i = 0; i < num_args; ++i) { + for (int64_t i = 0; i < num_args; ++i) { if (TaskSpec_arg_by_ref(task, i)) { ObjectID obj_id = TaskSpec_arg_id(task, i); if (algorithm_state->local_objects.count(obj_id) == 0) { /* If the entry is not yet available locally, record the dependency. */ fetch_missing_dependency(state, algorithm_state, task_entry_it, - obj_id.to_plasma_id()); + obj_id.to_plasma_id(), i); ++num_missing_dependencies; } } @@ -618,7 +621,9 @@ int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) { std::vector object_id_vec; for (auto const &entry : state->algorithm_state->remote_objects) { - object_id_vec.push_back(entry.first); + if (entry.second.request_transfer) { + object_id_vec.push_back(entry.first); + } } ObjectID *object_ids = object_id_vec.data(); @@ -903,8 +908,13 @@ void queue_dispatch_task(LocalSchedulerState *state, bool from_global_scheduler) { LOG_DEBUG("Queueing task in dispatch queue"); TaskQueueEntry task_entry = TaskQueueEntry_init(spec, task_spec_size); - queue_task(state, algorithm_state->dispatch_task_queue, &task_entry, - from_global_scheduler); + if (TaskSpec_is_actor_task(spec)) { + queue_actor_task(state, algorithm_state, spec, task_spec_size, + from_global_scheduler); + } else { + queue_task(state, algorithm_state->dispatch_task_queue, &task_entry, + from_global_scheduler); + } } /** @@ -943,9 +953,9 @@ void give_task_to_local_scheduler_retry(UniqueID id, CHECK(Task_state(task) == TASK_STATUS_SCHEDULED); TaskSpec *spec = Task_task_spec(task); + CHECK(TaskSpec_is_actor_task(spec)); ActorID actor_id = TaskSpec_actor_id(spec); - CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); CHECK(state->actor_mapping.count(actor_id) == 1); give_task_to_local_scheduler( @@ -992,7 +1002,7 @@ void give_task_to_global_scheduler_retry(UniqueID id, CHECK(Task_state(task) == TASK_STATUS_WAITING); TaskSpec *spec = Task_task_spec(task); - CHECK(ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)); + CHECK(!TaskSpec_is_actor_task(spec)); give_task_to_global_scheduler(state, state->algorithm_state, spec, Task_task_spec_size(task)); @@ -1070,8 +1080,8 @@ void handle_actor_task_submitted(LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, TaskSpec *task_spec, int64_t task_spec_size) { + CHECK(TaskSpec_is_actor_task(task_spec)); ActorID actor_id = TaskSpec_actor_id(task_spec); - CHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); if (state->actor_mapping.count(actor_id) == 0) { /* Add this task to a queue of tasks that have been submitted but the local @@ -1088,8 +1098,8 @@ void handle_actor_task_submitted(LocalSchedulerState *state, get_db_client_id(state->db))) { /* This local scheduler is responsible for the actor, so handle the task * locally. */ - add_task_to_actor_queue(state, algorithm_state, task_spec, task_spec_size, - false); + queue_task_locally(state, algorithm_state, task_spec, task_spec_size, + false); /* Attempt to dispatch tasks to this actor. */ dispatch_actor_task(state, algorithm_state, actor_id); } else { @@ -1149,8 +1159,8 @@ void handle_actor_task_scheduled(LocalSchedulerState *state, DCHECK(state->config.global_scheduler_exists); /* Check that the task is meant to run on an actor that this local scheduler * is responsible for. */ + DCHECK(TaskSpec_is_actor_task(spec)); ActorID actor_id = TaskSpec_actor_id(spec); - DCHECK(!ActorID_equal(actor_id, NIL_ACTOR_ID)); if (state->actor_mapping.count(actor_id) == 1) { DCHECK(DBClientID_equal(state->actor_mapping[actor_id].local_scheduler_id, get_db_client_id(state->db))); @@ -1165,7 +1175,7 @@ void handle_actor_task_scheduled(LocalSchedulerState *state, "corresponding actor_map_entry is not present. This should be rare."); } /* Push the task to the appropriate queue. */ - add_task_to_actor_queue(state, algorithm_state, spec, task_spec_size, true); + queue_task_locally(state, algorithm_state, spec, task_spec_size, true); dispatch_actor_task(state, algorithm_state, actor_id); } @@ -1257,6 +1267,11 @@ void handle_actor_worker_available(LocalSchedulerState *state, * to the assigned counter. */ if (!actor_checkpoint_failed) { entry.task_counter = entry.assigned_task_counter + 1; + /* If a task was assigned to this actor and there was no checkpoint + * failure, then it is now loaded. */ + if (entry.assigned_task_counter > -1) { + entry.loaded = true; + } } entry.assigned_task_counter = -1; entry.worker_available = true; @@ -1329,8 +1344,11 @@ void handle_object_available(LocalSchedulerState *state, * ready to run, move them to the dispatch queue. */ for (auto &it : entry.dependent_tasks) { if (can_run(algorithm_state, it->spec)) { - LOG_DEBUG("Moved task to dispatch queue"); - algorithm_state->dispatch_task_queue->push_back(*it); + if (TaskSpec_is_actor_task(it->spec)) { + insert_actor_task_queue(state, algorithm_state, *it); + } else { + algorithm_state->dispatch_task_queue->push_back(*it); + } /* Remove the entry with a matching TaskSpec pointer from the waiting * queue, but do not free the task spec. */ algorithm_state->waiting_task_queue->erase(it); @@ -1375,17 +1393,42 @@ void handle_object_removed(LocalSchedulerState *state, } } + std::vector empty_actor_queues; + for (auto it = algorithm_state->actors_with_pending_tasks.begin(); + it != algorithm_state->actors_with_pending_tasks.end(); it++) { + auto actor_info = algorithm_state->local_actor_infos[*it]; + for (auto queue_it = actor_info.task_queue->begin(); + queue_it != actor_info.task_queue->end();) { + if (TaskSpec_is_dependent_on(queue_it->spec, removed_object_id)) { + /* This task was dependent on the removed object. */ + LOG_DEBUG("Moved task from actor dispatch queue back to waiting queue"); + algorithm_state->waiting_task_queue->push_back(*queue_it); + /* Remove the task from the dispatch queue, but do not free the task + * spec. */ + queue_it = actor_info.task_queue->erase(queue_it); + if (actor_info.task_queue->size() == 0) { + empty_actor_queues.push_back(*it); + } + } else { + ++queue_it; + } + } + } + for (auto actor_id : empty_actor_queues) { + algorithm_state->actors_with_pending_tasks.erase(actor_id); + } + /* Track the dependency for tasks that are in the waiting queue, including * those that were just moved from the dispatch queue. */ for (auto it = algorithm_state->waiting_task_queue->begin(); it != algorithm_state->waiting_task_queue->end(); ++it) { int64_t num_args = TaskSpec_num_args(it->spec); - for (int i = 0; i < num_args; ++i) { + for (int64_t i = 0; i < num_args; ++i) { if (TaskSpec_arg_by_ref(it->spec, i)) { ObjectID arg_id = TaskSpec_arg_id(it->spec, i); if (ObjectID_equal(arg_id, removed_object_id)) { fetch_missing_dependency(state, algorithm_state, it, - removed_object_id.to_plasma_id()); + removed_object_id.to_plasma_id(), i); } } } diff --git a/test/actor_test.py b/test/actor_test.py index 459089773..cef079b45 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1434,8 +1434,8 @@ class ActorReconstruction(unittest.TestCase): # The most recently executed checkpoint task should throw an exception # when trying to resume. All other checkpoint tasks should reconstruct # the previous task but throw no errors. - self.assertEqual(len([error for error in errors if error[b"type"] == - b"task"]), 1) + self.assertTrue(len([error for error in errors if error[b"type"] == + b"task"]) > 0) ray.worker.cleanup()