diff --git a/python/ray/__init__.py b/python/ray/__init__.py index 980a8c758..59bfebda1 100644 --- a/python/ray/__init__.py +++ b/python/ray/__init__.py @@ -5,9 +5,11 @@ from __future__ import print_function from ray.worker import (register_class, error_info, init, connect, disconnect, get, put, wait, remote, log_event, log_span, flush_log, get_gpu_ids) -from ray.actor import actor from ray.worker import SCRIPT_MODE, WORKER_MODE, PYTHON_MODE, SILENT_MODE from ray.worker import global_state +# We import ray.actor because some code is run in actor.py which initializes +# some functions in the worker. +import ray.actor # noqa: F401 # Ray version string. TODO(rkn): This is also defined separately in setup.py. # Fix this. diff --git a/python/ray/actor.py b/python/ray/actor.py index 5ce1d7d40..01134b7ed 100644 --- a/python/ray/actor.py +++ b/python/ray/actor.py @@ -11,7 +11,7 @@ import traceback import ray.local_scheduler import ray.signature as signature import ray.worker -from ray.utils import (FunctionProperties, random_string, +from ray.utils import (FunctionProperties, hex_to_binary, random_string, select_local_scheduler) @@ -152,26 +152,128 @@ def export_actor(actor_id, class_id, actor_method_names, num_cpus, num_gpus, # notification so that when the newly created actor attempts to fetch the # information from Redis, it is already there. worker.redis_client.hmset(key, {"class_id": class_id, - "num_gpus": num_gpus}) + "driver_id": driver_id, + "local_scheduler_id": local_scheduler_id, + "num_gpus": num_gpus, + "removed": False}) # TODO(rkn): There is actually no guarantee that the local scheduler that # we are publishing to has already subscribed to the actor_notifications # channel. Therefore, this message may be missed and the workload will # hang. This is a bug. ray.utils.publish_actor_creation(actor_id.id(), driver_id, - local_scheduler_id, worker.redis_client) + local_scheduler_id, False, + worker.redis_client) -def actor(*args, **kwargs): - raise Exception("The @ray.actor decorator is deprecated. Instead, please " - "use @ray.remote.") +def reconstruct_actor_state(actor_id, worker): + """Reconstruct the state of an actor that is being reconstructed. + + Args: + actor_id: The ID of the actor being reconstructed. + worker: The worker object that is running the actor. + """ + # TODO(rkn): This call is expensive. It'd be nice to find a way to get only + # the tasks that are relevant to this actor. + tasks = ray.global_state.task_table() + + def hex_to_object_id(hex_id): + return ray.local_scheduler.ObjectID(hex_to_binary(hex_id)) + + relevant_tasks = [] + + # Loop over the task table and keep the tasks that are relevant to this + # actor. + for _, task_info in tasks.items(): + task_spec_info = task_info["TaskSpec"] + if hex_to_binary(task_spec_info["ActorID"]) == actor_id: + relevant_tasks.append(task_spec_info) + + # Sort the tasks by actor ID. + relevant_tasks.sort(key=lambda task: task["ActorCounter"]) + for i in range(len(relevant_tasks)): + assert relevant_tasks[i]["ActorCounter"] == i + + # This is a mini replica of the worker's main_loop. This will loop over all + # of the tasks that this actor is supposed to rerun. For each task, the + # worker will submit the task to the local scheduler, retrieve the task + # from the local scheduler, and execute the task. + for task_spec_info in relevant_tasks: + # Create a task spec out of the dictionary of info. This isn't + # necessary. It is strictly for the purposes of checking that the task + # we get back from the local scheduler is identical to the one we + # submit. + task_spec = ray.local_scheduler.Task( + hex_to_object_id(task_spec_info["DriverID"]), + hex_to_object_id(task_spec_info["FunctionID"]), + task_spec_info["Args"], + len(task_spec_info["ReturnObjectIDs"]), + hex_to_object_id(task_spec_info["ParentTaskID"]), + task_spec_info["ParentCounter"], + hex_to_object_id(task_spec_info["ActorID"]), + task_spec_info["ActorCounter"], + [task_spec_info["RequiredResources"]["CPUs"], + task_spec_info["RequiredResources"]["GPUs"]]) + + # Verify that the return object IDs are the same as they were the + # first time. + assert task_spec_info["ReturnObjectIDs"] == task_spec.returns() + + # We need to wait for the actor to be imported and for the functions to + # be defined before we can submit the task. + worker._wait_for_function(hex_to_binary(task_spec_info["FunctionID"]), + hex_to_binary(task_spec_info["DriverID"])) + + # Set some additional state. During normal operation + # (non-reconstruction) this state would already be set because tasks + # are only submitted from drivers or from workers that are in the + # middle of executing other tasks. + worker.task_driver_id = ray.local_scheduler.ObjectID( + hex_to_binary(task_spec_info["DriverID"])) + worker.current_task_id = ray.local_scheduler.ObjectID( + hex_to_binary(task_spec_info["ParentTaskID"])) + worker.task_index = task_spec_info["ParentCounter"] + + # Submit the task to the local scheduler. This is important so that the + # local scheduler does bookkeeping about this actor's resource + # utilization and things like that. It's also important for updating + # some state on the worker. + worker.submit_task( + hex_to_object_id(task_spec_info["FunctionID"]), + task_spec_info["Args"], + actor_id=hex_to_object_id(task_spec_info["ActorID"])) + + # Clear the extra state that we set. + del worker.task_driver_id + del worker.current_task_id + del worker.task_index + + # Get the task from the local scheduler. + retrieved_task = worker._get_next_task_from_local_scheduler() + # Assert that the retrieved task is the same as the constructed task. + assert (ray.local_scheduler.task_to_string(task_spec) == + ray.local_scheduler.task_to_string(retrieved_task)) + + # Wait for the task to be ready and execute the task. + worker._wait_for_and_process_task(retrieved_task) + + # Enter the main loop to receive and process tasks. + worker.main_loop() def make_actor(cls, num_cpus, num_gpus): # Modify the class to have an additional method that will be used for # terminating the worker. class Class(cls): - def __ray_terminate__(self): + def __ray_terminate__(self, actor_id): + # Record that this actor has been removed so that if this node + # dies later, the actor won't be recreated. Alternatively, we could + # remove the actor key from Redis here. + ray.worker.global_worker.redis_client.hset(b"Actor:" + actor_id, + "removed", True) + # Disconnect the worker from he local scheduler. The point of this + # is so that when the worker kills itself below, the local + # scheduler won't push an error message to the driver. ray.worker.global_worker.local_scheduler_client.disconnect() import os os._exit(0) @@ -302,7 +404,8 @@ def make_actor(cls, num_cpus, num_gpus): if ray.worker.global_worker.connected: actor_method_call( self._ray_actor_id, "__ray_terminate__", - self._ray_method_signatures["__ray_terminate__"]) + self._ray_method_signatures["__ray_terminate__"], + self._ray_actor_id.id()) return NewClass diff --git a/python/ray/monitor.py b/python/ray/monitor.py index ae7903261..9be416fe2 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -10,11 +10,9 @@ import redis import time import ray -from ray.services import get_ip_address -from ray.services import get_port -from ray.utils import binary_to_object_id -from ray.utils import binary_to_hex -from ray.utils import hex_to_binary +from ray.services import get_ip_address, get_port +import ray.utils +from ray.utils import binary_to_object_id, binary_to_hex, hex_to_binary # Import flatbuffer bindings. from ray.core.generated.SubscribeToDBClientTableReply \ @@ -98,6 +96,41 @@ class Monitor(object): self.subscribe_client.subscribe(channel) self.subscribed[channel] = False + def cleanup_actors(self): + """Recreate any live actors whose corresponding local scheduler died. + + For any live actor whose local scheduler just died, we choose a new + local scheduler and broadcast a notification to create that actor. + """ + actor_info = self.state.actors() + for actor_id, info in actor_info.items(): + if (not info["removed"] and + info["local_scheduler_id"] in self.dead_local_schedulers): + # Choose a new local scheduler to run the actor. + local_scheduler_id = ray.utils.select_local_scheduler( + info["driver_id"], self.state.local_schedulers(), + info["num_gpus"], self.redis) + import sys + sys.stdout.flush() + # The new local scheduler should not be the same as the old + # local scheduler. TODO(rkn): This should not be an assert, it + # should be something more benign. + assert (binary_to_hex(local_scheduler_id) != + info["local_scheduler_id"]) + # Announce to all of the local schedulers that the actor should + # be recreated on this new local scheduler. + ray.utils.publish_actor_creation( + hex_to_binary(actor_id), hex_to_binary(info["driver_id"]), + local_scheduler_id, True, self.redis) + log.info("Actor {} for driver {} was on dead local scheduler " + "{}. It is being recreated on local scheduler {}" + .format(actor_id, info["driver_id"], + info["local_scheduler_id"], + binary_to_hex(local_scheduler_id))) + # Update the actor info in Redis. + self.redis.hset(b"Actor:" + hex_to_binary(actor_id), + "local_scheduler_id", local_scheduler_id) + def cleanup_task_table(self): """Clean up global state for failed local schedulers. @@ -348,6 +381,7 @@ class Monitor(object): # state in the state tables. if len(self.dead_local_schedulers) > 0: self.cleanup_task_table() + self.cleanup_actors() if len(self.dead_plasma_managers) > 0: self.cleanup_object_table() log.debug("{} dead local schedulers, {} plasma managers total, {} " @@ -369,6 +403,7 @@ class Monitor(object): # dead in this round, clean up the associated state. if len(self.dead_local_schedulers) > num_dead_local_schedulers: self.cleanup_task_table() + self.cleanup_actors() if len(self.dead_plasma_managers) > num_dead_plasma_managers: self.cleanup_object_table() diff --git a/python/ray/utils.py b/python/ray/utils.py index b92c0f6b4..902be4fa1 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -183,7 +183,7 @@ def select_local_scheduler(driver_id, local_schedulers, num_gpus, def publish_actor_creation(actor_id, driver_id, local_scheduler_id, - redis_client): + reconstruct, redis_client): """Publish a notification that an actor should be created. This broadcast will be received by all of the local schedulers. The local @@ -197,11 +197,14 @@ def publish_actor_creation(actor_id, driver_id, local_scheduler_id, driver_id: The ID of the driver responsible for the actor. local_scheduler_id: The ID of the local scheduler that is suposed to create the actor. + reconstruct: True if the actor should be created in "reconstruct" mode. redis_client: The client used to interact with Redis. """ + reconstruct_bit = b"1" if reconstruct else b"0" # Really we should encode this message as a flatbuffer object. However, # we're having trouble getting that to work. It almost works, but in Python # 2.7, builder.CreateString fails on byte strings that contain characters # outside range(128). redis_client.publish("actor_notifications", - actor_id + driver_id + local_scheduler_id) + actor_id + driver_id + local_scheduler_id + + reconstruct_bit) diff --git a/python/ray/workers/default_worker.py b/python/ray/workers/default_worker.py index 63ba3069b..d4660327f 100644 --- a/python/ray/workers/default_worker.py +++ b/python/ray/workers/default_worker.py @@ -9,6 +9,7 @@ import redis import traceback import ray +import ray.actor parser = argparse.ArgumentParser(description=("Parse addresses for the worker " "to connect to.")) @@ -24,6 +25,9 @@ parser.add_argument("--local-scheduler-name", required=True, type=str, help="the local scheduler's name") parser.add_argument("--actor-id", required=False, type=str, help="the actor ID of this worker") +parser.add_argument("--reconstruct", action="store_true", + help=("true if the actor should be started in reconstruct " + "mode")) def random_string(): @@ -57,6 +61,11 @@ def push_error_to_all_drivers(redis_client, message): if __name__ == "__main__": args = parser.parse_args() + + # If this worker is not an actor, it cannot be started in reconstruct mode. + if args.actor_id is None: + assert not args.reconstruct + info = {"node_ip_address": args.node_ip_address, "redis_address": args.redis_address, "store_socket_name": args.object_store_name, @@ -70,6 +79,17 @@ if __name__ == "__main__": ray.worker.connect(info, mode=ray.WORKER_MODE, actor_id=actor_id) + # If this is an actor started in reconstruct mode, rerun tasks to + # reconstruct its state. + if args.reconstruct: + try: + ray.actor.reconstruct_actor_state(actor_id, + ray.worker.global_worker) + except Exception as e: + redis_client = create_redis_client(args.redis_address) + push_error_to_all_drivers(redis_client, traceback.format_exc()) + raise e + error_explanation = """ This error is unexpected and should not have happened. Somehow a worker crashed in an unanticipated way causing the main_loop to throw an exception, diff --git a/src/common/state/actor_notification_table.h b/src/common/state/actor_notification_table.h index 87d7b7633..87eddde37 100644 --- a/src/common/state/actor_notification_table.h +++ b/src/common/state/actor_notification_table.h @@ -14,6 +14,7 @@ typedef void (*actor_notification_table_subscribe_callback)( ActorID actor_id, WorkerID driver_id, DBClientID local_scheduler_id, + bool reconstruct, void *user_context); /** diff --git a/src/common/state/redis.cc b/src/common/state/redis.cc index 44b65c0c8..a0066f8e3 100644 --- a/src/common/state/redis.cc +++ b/src/common/state/redis.cc @@ -1523,16 +1523,33 @@ void redis_actor_notification_table_subscribe_callback(redisAsyncContext *c, ActorID actor_id; WorkerID driver_id; DBClientID local_scheduler_id; - CHECK(sizeof(actor_id) + sizeof(driver_id) + sizeof(local_scheduler_id) == + bool reconstruct; + CHECK(sizeof(actor_id) + sizeof(driver_id) + sizeof(local_scheduler_id) + + 1 == payload->len); - memcpy(&actor_id, payload->str, sizeof(actor_id)); - memcpy(&driver_id, payload->str + sizeof(actor_id), sizeof(driver_id)); - memcpy(&local_scheduler_id, - payload->str + sizeof(actor_id) + sizeof(driver_id), - sizeof(local_scheduler_id)); + char *current_ptr = payload->str; + /* Parse the actor ID. */ + memcpy(&actor_id, current_ptr, sizeof(actor_id)); + current_ptr += sizeof(actor_id); + /* Parse the driver ID. */ + memcpy(&driver_id, current_ptr, sizeof(driver_id)); + current_ptr += sizeof(driver_id); + /* Parse the local scheduler ID. */ + memcpy(&local_scheduler_id, current_ptr, sizeof(local_scheduler_id)); + current_ptr += sizeof(local_scheduler_id); + /* Parse the reconstruct bit. */ + if (*current_ptr == '1') { + reconstruct = true; + } else if (*current_ptr == '0') { + reconstruct = false; + } else { + LOG_FATAL("This code should be unreachable."); + } + current_ptr += 1; + if (data->subscribe_callback) { data->subscribe_callback(actor_id, driver_id, local_scheduler_id, - data->subscribe_context); + reconstruct, data->subscribe_context); } } else if (strcmp(message_type->str, "subscribe") == 0) { /* The reply for the initial SUBSCRIBE command. */ diff --git a/src/local_scheduler/local_scheduler.cc b/src/local_scheduler/local_scheduler.cc index 6aa174a03..d788014d4 100644 --- a/src/local_scheduler/local_scheduler.cc +++ b/src/local_scheduler/local_scheduler.cc @@ -206,7 +206,13 @@ void LocalSchedulerState_free(LocalSchedulerState *state) { * @param state The state of the local scheduler. * @return Void. */ -void start_worker(LocalSchedulerState *state, ActorID actor_id) { +void start_worker(LocalSchedulerState *state, + ActorID actor_id, + bool reconstruct) { + /* Non-actors can't be started in reconstruct mode. */ + if (ActorID_equal(actor_id, NIL_ACTOR_ID)) { + CHECK(!reconstruct); + } /* We can't start a worker if we don't have the path to the worker script. */ if (state->config.start_worker_command == NULL) { LOG_WARN("No valid command to start worker provided. Cannot start worker."); @@ -223,24 +229,30 @@ void start_worker(LocalSchedulerState *state, ActorID actor_id) { /* Reset the SIGCHLD handler so that it doesn't influence the worker. */ signal(SIGCHLD, SIG_DFL); + std::vector command_vector; + for (int i = 0; state->config.start_worker_command[i] != NULL; i++) { + command_vector.push_back(state->config.start_worker_command[i]); + } + + /* Pass in the worker's actor ID. */ + const char *actor_id_string = "--actor-id"; char id_string[ID_STRING_SIZE]; ObjectID_to_string(actor_id, id_string, ID_STRING_SIZE); - /* Figure out how many arguments there are in the start_worker_command. */ - int num_args = 0; - for (; state->config.start_worker_command[num_args] != NULL; ++num_args) { + command_vector.push_back(actor_id_string); + command_vector.push_back((const char *) id_string); + + /* Add a flag for reconstructing the actor if necessary. */ + const char *reconstruct_string = "--reconstruct"; + if (reconstruct) { + command_vector.push_back(reconstruct_string); } - const char **start_actor_worker_command = - (const char **) malloc((num_args + 3) * sizeof(const char *)); - for (int i = 0; i < num_args; ++i) { - start_actor_worker_command[i] = state->config.start_worker_command[i]; - } - start_actor_worker_command[num_args] = "--actor-id"; - start_actor_worker_command[num_args + 1] = (const char *) id_string; - start_actor_worker_command[num_args + 2] = NULL; + + /* Add a NULL pointer to the end. */ + command_vector.push_back(NULL); + /* Try to execute the worker command. Exit if we're not successful. */ - execvp(start_actor_worker_command[0], - (char *const *) start_actor_worker_command); - free(start_actor_worker_command); + execvp(command_vector[0], (char *const *) command_vector.data()); + LocalSchedulerState_free(state); LOG_FATAL("Failed to start worker"); } @@ -391,7 +403,7 @@ LocalSchedulerState *LocalSchedulerState_init( /* Start the initial set of workers. */ for (int i = 0; i < num_workers; ++i) { - start_worker(state, NIL_ACTOR_ID); + start_worker(state, NIL_ACTOR_ID, false); } /* Initialize the time at which the previous heartbeat was sent. */ @@ -593,15 +605,18 @@ void reconstruct_task_update_callback(Task *task, TaskSpec *spec = Task_task_spec(task); /* If the task is an actor task, then we currently do not reconstruct it. * TODO(rkn): Handle this better. */ - CHECK(ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)); - /* Resubmit the task. */ - handle_task_submitted(state, state->algorithm_state, spec, - Task_task_spec_size(task)); - /* Recursively reconstruct the task's inputs, if necessary. */ - for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { - if (TaskSpec_arg_by_ref(spec, i)) { - ObjectID arg_id = TaskSpec_arg_id(spec, i); - reconstruct_object(state, arg_id); + if (!ActorID_equal(TaskSpec_actor_id(spec), NIL_ACTOR_ID)) { + LOG_WARN("We are not resubmitting this task because it is an actor task."); + } else { + /* Resubmit the task. */ + handle_task_submitted(state, state->algorithm_state, spec, + Task_task_spec_size(task)); + /* Recursively reconstruct the task's inputs, if necessary. */ + for (int64_t i = 0; i < TaskSpec_num_args(spec); ++i) { + if (TaskSpec_arg_by_ref(spec, i)) { + ObjectID arg_id = TaskSpec_arg_id(spec, i); + reconstruct_object(state, arg_id); + } } } } @@ -906,7 +921,7 @@ void process_message(event_loop *loop, /* If the disconnected worker was not an actor, start a new worker to make * sure there are enough workers in the pool. */ if (ActorID_equal(worker->actor_id, NIL_ACTOR_ID)) { - start_worker(state, NIL_ACTOR_ID); + start_worker(state, NIL_ACTOR_ID, false); } } break; case MessageType_EventLogMessage: { @@ -1090,11 +1105,14 @@ void handle_task_scheduled_callback(Task *original_task, * @param actor_id The ID of the actor being created. * @param local_scheduler_id The ID of the local scheduler that is responsible * for creating the actor. + * @param reconstruct True if the actor should be started in "reconstruct" mode. + * @param context The context for this callback. * @return Void. */ void handle_actor_creation_callback(ActorID actor_id, WorkerID driver_id, DBClientID local_scheduler_id, + bool reconstruct, void *context) { LocalSchedulerState *state = (LocalSchedulerState *) context; @@ -1103,11 +1121,29 @@ void handle_actor_creation_callback(ActorID actor_id, return; } - /* Make sure the actor entry is not already present in the actor map table. - * TODO(rkn): We will need to remove this check to handle the case where the - * corresponding publish is retried and the case in which a task that creates - * an actor is resubmitted due to fault tolerance. */ - CHECK(state->actor_mapping.count(actor_id) == 0); + if (!reconstruct) { + /* Make sure the actor entry is not already present in the actor map table. + * TODO(rkn): We will need to remove this check to handle the case where the + * corresponding publish is retried and the case in which a task that + * creates an actor is resubmitted due to fault tolerance. */ + CHECK(state->actor_mapping.count(actor_id) == 0); + } else { + /* In this case, the actor already exists. Check that the driver hasn't + * changed but that the local scheduler has. */ + auto it = state->actor_mapping.find(actor_id); + CHECK(it != state->actor_mapping.end()); + CHECK(WorkerID_equal(it->second.driver_id, driver_id)); + CHECK(!DBClientID_equal(it->second.local_scheduler_id, local_scheduler_id)); + /* If the actor was previously assigned to this local scheduler, kill the + * actor. */ + if (DBClientID_equal(it->second.local_scheduler_id, + get_db_client_id(state->db))) { + /* TODO(rkn): We should kill the actor here if it is still around. Also, + * if it hasn't registered yet, we should keep track of its PID so we can + * kill it anyway. */ + } + } + /* Create a new entry and add it to the actor mapping table. TODO(rkn): * Currently this is never removed (except when the local scheduler state is * deleted). */ @@ -1119,11 +1155,12 @@ void handle_actor_creation_callback(ActorID actor_id, /* If this local scheduler is responsible for the actor, then start a new * worker for the actor. */ if (DBClientID_equal(local_scheduler_id, get_db_client_id(state->db))) { - start_worker(state, actor_id); + start_worker(state, actor_id, reconstruct); } /* Let the scheduling algorithm process the fact that a new actor has been * created. */ - handle_actor_creation_notification(state, state->algorithm_state, actor_id); + handle_actor_creation_notification(state, state->algorithm_state, actor_id, + reconstruct); } int heartbeat_handler(event_loop *loop, timer_id id, void *context) { diff --git a/src/local_scheduler/local_scheduler.h b/src/local_scheduler/local_scheduler.h index 55e229fa9..31fc07ae1 100644 --- a/src/local_scheduler/local_scheduler.h +++ b/src/local_scheduler/local_scheduler.h @@ -114,9 +114,13 @@ void kill_worker(LocalSchedulerState *state, * @param state The local scheduler state. * @param actor_id The ID of the actor for this worker. If this worker is not an * actor, then NIL_ACTOR_ID should be used. + * @param reconstruct True if the worker is an actor and is being started in + * reconstruct mode. * @param Void. */ -void start_worker(LocalSchedulerState *state, ActorID actor_id); +void start_worker(LocalSchedulerState *state, + ActorID actor_id, + bool reconstruct); /** * Check if a certain quantity of dynamic resources are available. If num_cpus diff --git a/src/local_scheduler/local_scheduler_algorithm.cc b/src/local_scheduler/local_scheduler_algorithm.cc index 09b9b7a4d..db9a2698e 100644 --- a/src/local_scheduler/local_scheduler_algorithm.cc +++ b/src/local_scheduler/local_scheduler_algorithm.cc @@ -407,7 +407,13 @@ void add_task_to_actor_queue(LocalSchedulerState *state, * guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This * check will fail if the fault-tolerance mechanism resubmits a task on an * actor. */ - CHECK(task_counter >= entry.task_counter); + bool task_is_redundant = false; + if (task_counter < entry.task_counter) { + LOG_INFO( + "A task that has already been executed has been resubmitted, so we " + "are ignoring it. This should only happen during reconstruction."); + task_is_redundant = true; + } /* Create a new task queue entry. */ TaskQueueEntry elt = TaskQueueEntry_init(spec, task_spec_size); @@ -421,25 +427,36 @@ void add_task_to_actor_queue(LocalSchedulerState *state, (task_counter > TaskSpec_actor_counter(it->spec))) { ++it; } - entry.task_queue->insert(it, elt); - - /* Update the task table. */ - if (state->db != NULL) { - Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_QUEUED, - get_db_client_id(state->db)); - if (from_global_scheduler) { - /* If the task is from the global scheduler, it's already been added to - * the task table, so just update the entry. */ - task_table_update(state->db, task, NULL, NULL, NULL); - } else { - /* Otherwise, this is the first time the task has been seen in the system - * (unless it's a resubmission of a previous task), so add the entry. */ - task_table_add_task(state->db, task, NULL, NULL, NULL); - } + if (it != entry.task_queue->end() && + task_counter == TaskSpec_actor_counter(it->spec)) { + LOG_INFO( + "A task that has already been executed has been resubmitted, so we " + "are ignoring it. This should only happen during reconstruction."); + task_is_redundant = true; } - /* Record the fact that this actor has a task waiting to execute. */ - algorithm_state->actors_with_pending_tasks.insert(actor_id); + if (!task_is_redundant) { + entry.task_queue->insert(it, elt); + + /* Update the task table. */ + if (state->db != NULL) { + Task *task = Task_alloc(spec, task_spec_size, TASK_STATUS_QUEUED, + get_db_client_id(state->db)); + if (from_global_scheduler) { + /* If the task is from the global scheduler, it's already been added to + * the task table, so just update the entry. */ + task_table_update(state->db, task, NULL, NULL, NULL); + } else { + /* Otherwise, this is the first time the task has been seen in the + * system (unless it's a resubmission of a previous task), so add the + * entry. */ + task_table_add_task(state->db, task, NULL, NULL, NULL); + } + } + + /* Record the fact that this actor has a task waiting to execute. */ + algorithm_state->actors_with_pending_tasks.insert(actor_id); + } } /** @@ -666,7 +683,7 @@ void dispatch_tasks(LocalSchedulerState *state, if (state->child_pids.size() == 0) { /* If there are no workers, including those pending PID registration, * then we must start a new one to replenish the worker pool. */ - start_worker(state, NIL_ACTOR_ID); + start_worker(state, NIL_ACTOR_ID, false); } return; } @@ -979,7 +996,8 @@ void handle_actor_task_submitted(LocalSchedulerState *state, void handle_actor_creation_notification( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - ActorID actor_id) { + ActorID actor_id, + bool reconstruct) { int num_cached_actor_tasks = algorithm_state->cached_submitted_actor_tasks.size(); diff --git a/src/local_scheduler/local_scheduler_algorithm.h b/src/local_scheduler/local_scheduler_algorithm.h index 4eb6cbee9..914260ff7 100644 --- a/src/local_scheduler/local_scheduler_algorithm.h +++ b/src/local_scheduler/local_scheduler_algorithm.h @@ -88,12 +88,14 @@ void handle_actor_task_submitted(LocalSchedulerState *state, * @param state The state of the local scheduler. * @param algorithm_state State maintained by the scheduling algorithm. * @param actor_id The ID of the actor being created. + * @param reconstruct True if the actor is being created in "reconstruct" mode. * @return Void. */ void handle_actor_creation_notification( LocalSchedulerState *state, SchedulingAlgorithmState *algorithm_state, - ActorID actor_id); + ActorID actor_id, + bool reconstruct); /** * This function will be called when a task is assigned by the global scheduler diff --git a/src/local_scheduler/test/local_scheduler_tests.cc b/src/local_scheduler/test/local_scheduler_tests.cc index acdecc91c..5909dd280 100644 --- a/src/local_scheduler/test/local_scheduler_tests.cc +++ b/src/local_scheduler/test/local_scheduler_tests.cc @@ -646,7 +646,7 @@ TEST start_kill_workers_test(void) { num_workers - 1); /* Start a worker after the local scheduler has been initialized. */ - start_worker(local_scheduler->local_scheduler_state, NIL_ACTOR_ID); + start_worker(local_scheduler->local_scheduler_state, NIL_ACTOR_ID, false); /* Accept the workers as clients to the plasma manager. */ int new_worker_fd = accept_client(local_scheduler->plasma_manager_fd); /* The new worker should register its process ID. */ diff --git a/test/actor_test.py b/test/actor_test.py index b9b0f65a4..23dba657e 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -1095,5 +1095,125 @@ class ActorsWithGPUs(unittest.TestCase): ray.worker.cleanup() +class ActorReconstruction(unittest.TestCase): + + def testLocalSchedulerDying(self): + ray.worker._init(start_ray_local=True, num_local_schedulers=2, + num_workers=0, redirect_output=True) + + @ray.remote + class Counter(object): + def __init__(self): + self.x = 0 + + def local_plasma(self): + return ray.worker.global_worker.plasma_client.store_socket_name + + def inc(self): + self.x += 1 + return self.x + + local_plasma = ray.worker.global_worker.plasma_client.store_socket_name + + # Create an actor that is not on the local scheduler. + actor = Counter.remote() + while ray.get(actor.local_plasma.remote()) == local_plasma: + actor = Counter.remote() + + ids = [actor.inc.remote() for _ in range(100)] + + # Wait for the last task to finish running. + ray.get(ids[-1]) + + # Kill the second local scheduler. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_LOCAL_SCHEDULER][1] + process.kill() + process.wait() + # Kill the corresponding plasma store to get rid of the cached objects. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][1] + process.kill() + process.wait() + + # Get all of the results + results = ray.get(ids) + + self.assertEqual(results, list(range(1, 1 + len(results)))) + + ray.worker.cleanup() + + def testManyLocalSchedulersDying(self): + # This test can be made more stressful by increasing the numbers below. + # The total number of actors created will be + # num_actors_at_a_time * num_local_schedulers. + num_local_schedulers = 5 + num_actors_at_a_time = 3 + num_function_calls_at_a_time = 10 + + ray.worker._init(start_ray_local=True, + num_local_schedulers=num_local_schedulers, + num_workers=0, redirect_output=True) + + @ray.remote + class SlowCounter(object): + def __init__(self): + self.x = 0 + + def inc(self, duration): + time.sleep(duration) + self.x += 1 + return self.x + + # Create some initial actors. + actors = [SlowCounter.remote() for _ in range(num_actors_at_a_time)] + + # Wait for the actors to start up. + time.sleep(1) + + # This is a mapping from actor handles to object IDs returned by + # methods on that actor. + result_ids = collections.defaultdict(lambda: []) + + # In a loop we are going to create some actors, run some methods, kill + # a local scheduler, and run some more methods. + for i in range(num_local_schedulers - 1): + # Create some actors. + actors.extend([SlowCounter.remote() + for _ in range(num_actors_at_a_time)]) + # Run some methods. + for j in range(len(actors)): + actor = actors[j] + for _ in range(num_function_calls_at_a_time): + result_ids[actor].append( + actor.inc.remote(j ** 2 * 0.000001)) + # Kill a local scheduler. Don't kill the first local scheduler + # since that is the one that the driver is connected to. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_LOCAL_SCHEDULER][i + 1] + process.kill() + process.wait() + # Kill the corresponding plasma store to get rid of the cached + # objects. + process = ray.services.all_processes[ + ray.services.PROCESS_TYPE_PLASMA_STORE][i + 1] + process.kill() + process.wait() + + # Run some more methods. + for j in range(len(actors)): + actor = actors[j] + for _ in range(num_function_calls_at_a_time): + result_ids[actor].append( + actor.inc.remote(j ** 2 * 0.000001)) + + # Get the results and check that they have the correct values. + for _, result_id_list in result_ids.items(): + self.assertEqual(ray.get(result_id_list), + list(range(1, len(result_id_list) + 1))) + + ray.worker.cleanup() + + if __name__ == "__main__": unittest.main(verbosity=2)