mirror of
https://github.com/wassname/ray.git
synced 2026-07-02 17:57:14 +08:00
1887 lines
80 KiB
C++
1887 lines
80 KiB
C++
#include "local_scheduler_algorithm.h"
|
|
|
|
#include <list>
|
|
#include <vector>
|
|
#include <unordered_map>
|
|
|
|
#include "state/task_table.h"
|
|
#include "state/actor_notification_table.h"
|
|
#include "state/db_client_table.h"
|
|
#include "state/error_table.h"
|
|
#include "state/local_scheduler_table.h"
|
|
#include "state/object_table.h"
|
|
#include "local_scheduler_shared.h"
|
|
#include "local_scheduler.h"
|
|
#include "common/task.h"
|
|
|
|
/* Declared for convenience. */
|
|
void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id);
|
|
|
|
void give_task_to_global_scheduler(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec);
|
|
|
|
void give_task_to_local_scheduler(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
DBClientID local_scheduler_id);
|
|
|
|
void clear_missing_dependencies(SchedulingAlgorithmState *algorithm_state,
|
|
std::list<TaskExecutionSpec>::iterator it);
|
|
|
|
/** A data structure used to track which objects are available locally and
|
|
* which objects are being actively fetched. Objects of this type are used for
|
|
* both the scheduling algorithm state's local_objects and remote_objects
|
|
* tables. An ObjectEntry should be in at most one of the tables and not both
|
|
* simultaneously. */
|
|
struct ObjectEntry {
|
|
/** A vector of tasks dependent on this object. These tasks are a subset of
|
|
* the tasks in the waiting queue. Each element actually stores a reference
|
|
* to the corresponding task's queue entry in waiting queue, for fast
|
|
* deletion when all of the task's dependencies become available. */
|
|
std::vector<std::list<TaskExecutionSpec>::iterator> dependent_tasks;
|
|
/** Whether or not to request a transfer of this object. This should be set
|
|
* to true for all objects except for actor dummy objects, where the object
|
|
* must be generated by executing the task locally. */
|
|
bool request_transfer;
|
|
};
|
|
|
|
/** This struct contains information about a specific actor. This struct will be
|
|
* used inside of a hash table. */
|
|
typedef struct {
|
|
/** The number of tasks that have been executed on this actor so far, per
|
|
* handle. This is used to guarantee execution of tasks on actors in the
|
|
* order that the tasks were submitted, per handle. Tasks from different
|
|
* handles to the same actor may be interleaved. */
|
|
std::unordered_map<ActorHandleID, int64_t> task_counters;
|
|
/** These are the execution dependencies that make up the frontier of the
|
|
* actor's runnable tasks. For each actor handle, we store the object ID
|
|
* that represents the execution dependency for the next runnable task
|
|
* submitted by that handle. */
|
|
std::unordered_map<ActorHandleID, ObjectID> frontier_dependencies;
|
|
/** The return value of the most recently executed task. The next task to
|
|
* execute should take this as an execution dependency at dispatch time. Set
|
|
* to nil if there are no execution dependencies (e.g., this is the first
|
|
* task to execute). */
|
|
ObjectID execution_dependency;
|
|
/** A queue of tasks to be executed on this actor. The tasks will be sorted by
|
|
* the order of their actor counters. */
|
|
std::list<TaskExecutionSpec> *task_queue;
|
|
/** The worker that the actor is running on. */
|
|
LocalSchedulerClient *worker;
|
|
/** True if the worker is available and false otherwise. */
|
|
bool worker_available;
|
|
} LocalActorInfo;
|
|
|
|
/** Part of the local scheduler state that is maintained by the scheduling
|
|
* algorithm. */
|
|
struct SchedulingAlgorithmState {
|
|
/** An array of pointers to tasks that are waiting for dependencies. */
|
|
std::list<TaskExecutionSpec> *waiting_task_queue;
|
|
/** An array of pointers to tasks whose dependencies are ready but that are
|
|
* waiting to be assigned to a worker. */
|
|
std::list<TaskExecutionSpec> *dispatch_task_queue;
|
|
/** This is a hash table from actor ID to information about that actor. In
|
|
* particular, a queue of tasks that are waiting to execute on that actor.
|
|
* This is only used for actors that exist locally. */
|
|
std::unordered_map<ActorID, LocalActorInfo> local_actor_infos;
|
|
/** This is a set of the IDs of the actors that have tasks waiting to run.
|
|
* The purpose is to make it easier to dispatch tasks without looping over
|
|
* all of the actors. Note that this is an optimization and is not strictly
|
|
* necessary. */
|
|
std::unordered_set<ActorID> actors_with_pending_tasks;
|
|
/** A vector of actor tasks that have been submitted but this local scheduler
|
|
* doesn't know which local scheduler is responsible for them, so cannot
|
|
* assign them to the correct local scheduler yet. Whenever a notification
|
|
* about a new local scheduler arrives, we will resubmit all of these tasks
|
|
* locally. */
|
|
std::vector<TaskExecutionSpec> cached_submitted_actor_tasks;
|
|
/** An array of pointers to workers in the worker pool. These are workers
|
|
* that have registered a PID with us and that are now waiting to be
|
|
* assigned a task to execute. */
|
|
std::vector<LocalSchedulerClient *> available_workers;
|
|
/** An array of pointers to workers that are currently executing a task,
|
|
* unblocked. These are the workers that are leasing some number of
|
|
* resources. */
|
|
std::vector<LocalSchedulerClient *> executing_workers;
|
|
/** An array of pointers to workers that are currently executing a task,
|
|
* blocked on some object(s) that isn't available locally yet. These are the
|
|
* workers that are executing a task, but that have temporarily returned the
|
|
* task's required resources. */
|
|
std::vector<LocalSchedulerClient *> blocked_workers;
|
|
/** A hash map of the objects that are available in the local Plasma store.
|
|
* The key is the object ID. This information could be a little stale. */
|
|
std::unordered_map<ObjectID, ObjectEntry> local_objects;
|
|
/** A hash map of the objects that are not available locally. These are
|
|
* currently being fetched by this local scheduler. The key is the object
|
|
* ID. Every local_scheduler_fetch_timeout_milliseconds, a Plasma fetch
|
|
* request will be sent the object IDs in this table. Each entry also holds
|
|
* an array of queued tasks that are dependent on it. */
|
|
std::unordered_map<ObjectID, ObjectEntry> remote_objects;
|
|
};
|
|
|
|
SchedulingAlgorithmState *SchedulingAlgorithmState_init(void) {
|
|
SchedulingAlgorithmState *algorithm_state = new SchedulingAlgorithmState();
|
|
/* Initialize the local data structures used for queuing tasks and workers. */
|
|
algorithm_state->waiting_task_queue = new std::list<TaskExecutionSpec>();
|
|
algorithm_state->dispatch_task_queue = new std::list<TaskExecutionSpec>();
|
|
|
|
return algorithm_state;
|
|
}
|
|
|
|
void SchedulingAlgorithmState_free(SchedulingAlgorithmState *algorithm_state) {
|
|
/* Free all of the tasks in the waiting queue. */
|
|
delete algorithm_state->waiting_task_queue;
|
|
/* Free all the tasks in the dispatch queue. */
|
|
delete algorithm_state->dispatch_task_queue;
|
|
/* Remove all of the remaining actors. */
|
|
while (algorithm_state->local_actor_infos.size() != 0) {
|
|
auto it = algorithm_state->local_actor_infos.begin();
|
|
ActorID actor_id = it->first;
|
|
remove_actor(algorithm_state, actor_id);
|
|
}
|
|
/* Free the algorithm state. */
|
|
delete algorithm_state;
|
|
}
|
|
|
|
/**
|
|
* This is a helper method to check if a worker is in a vector of workers.
|
|
*
|
|
* @param worker_vector A vector of workers.
|
|
* @param The worker to look for in the vector.
|
|
* @return True if the worker is in the vector and false otherwise.
|
|
*/
|
|
bool worker_in_vector(std::vector<LocalSchedulerClient *> &worker_vector,
|
|
LocalSchedulerClient *worker) {
|
|
auto it = std::find(worker_vector.begin(), worker_vector.end(), worker);
|
|
return it != worker_vector.end();
|
|
}
|
|
|
|
/**
|
|
* This is a helper method to remove a worker from a vector of workers if it is
|
|
* present in the vector.
|
|
*
|
|
* @param worker_vector A vector of workers.
|
|
* @param The worker to remove.
|
|
* @return True if the worker was removed and false otherwise.
|
|
*/
|
|
bool remove_worker_from_vector(
|
|
std::vector<LocalSchedulerClient *> &worker_vector,
|
|
LocalSchedulerClient *worker) {
|
|
/* Find the worker in the list of executing workers. */
|
|
auto it = std::find(worker_vector.begin(), worker_vector.end(), worker);
|
|
bool remove_worker = (it != worker_vector.end());
|
|
if (remove_worker) {
|
|
/* Remove the worker from the list of workers. */
|
|
using std::swap;
|
|
swap(*it, worker_vector.back());
|
|
worker_vector.pop_back();
|
|
}
|
|
return remove_worker;
|
|
}
|
|
|
|
void provide_scheduler_info(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerInfo *info) {
|
|
info->total_num_workers = state->workers.size();
|
|
/* TODO(swang): Provide separate counts for tasks that are waiting for
|
|
* dependencies vs tasks that are waiting to be assigned. */
|
|
int64_t waiting_task_queue_length =
|
|
algorithm_state->waiting_task_queue->size();
|
|
int64_t dispatch_task_queue_length =
|
|
algorithm_state->dispatch_task_queue->size();
|
|
info->task_queue_length =
|
|
waiting_task_queue_length + dispatch_task_queue_length;
|
|
info->available_workers = algorithm_state->available_workers.size();
|
|
/* Copy static and dynamic resource information. */
|
|
info->dynamic_resources = state->dynamic_resources;
|
|
info->static_resources = state->static_resources;
|
|
}
|
|
|
|
/**
|
|
* Create the LocalActorInfo struct for an actor worker that this local
|
|
* scheduler is responsible for. For a given actor, this will either be done
|
|
* when the first task for that actor arrives or when the worker running that
|
|
* actor connects to the local scheduler.
|
|
*
|
|
* @param algorithm_state The state of the scheduling algorithm.
|
|
* @param actor_id The actor ID of the actor being created.
|
|
* @param initial_execution_dependency The dummy object ID of the actor
|
|
* creation task.
|
|
* @param worker The worker struct for the worker that is running this actor.
|
|
* If the worker struct has not been created yet (meaning that the worker
|
|
* that is running this actor has not registered with the local scheduler
|
|
* yet, and so create_actor is being called because a task for that actor
|
|
* has arrived), then this should be NULL.
|
|
* @return Void.
|
|
*/
|
|
void create_actor(SchedulingAlgorithmState *algorithm_state,
|
|
const ActorID &actor_id,
|
|
const ObjectID &initial_execution_dependency,
|
|
LocalSchedulerClient *worker) {
|
|
LocalActorInfo entry;
|
|
entry.task_counters[ActorHandleID::nil()] = 0;
|
|
entry.frontier_dependencies[ActorHandleID::nil()] = ObjectID::nil();
|
|
/* The actor has not yet executed any tasks, so there are no execution
|
|
* dependencies for the next task to be scheduled. */
|
|
entry.execution_dependency = initial_execution_dependency;
|
|
entry.task_queue = new std::list<TaskExecutionSpec>();
|
|
entry.worker = worker;
|
|
entry.worker_available = false;
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) == 0);
|
|
algorithm_state->local_actor_infos[actor_id] = entry;
|
|
|
|
/* Log some useful information about the actor that we created. */
|
|
RAY_LOG(DEBUG) << "Creating actor with ID " << actor_id;
|
|
}
|
|
|
|
void remove_actor(SchedulingAlgorithmState *algorithm_state, ActorID actor_id) {
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
|
|
LocalActorInfo &entry =
|
|
algorithm_state->local_actor_infos.find(actor_id)->second;
|
|
|
|
/* Log some useful information about the actor that we're removing. */
|
|
size_t count = entry.task_queue->size();
|
|
if (count > 0) {
|
|
RAY_LOG(WARNING) << "Removing actor with ID " << actor_id << " and "
|
|
<< count << " remaining tasks.";
|
|
}
|
|
|
|
entry.task_queue->clear();
|
|
delete entry.task_queue;
|
|
/* Remove the entry from the hash table. */
|
|
algorithm_state->local_actor_infos.erase(actor_id);
|
|
|
|
/* Remove the actor ID from the set of actors with pending tasks. */
|
|
algorithm_state->actors_with_pending_tasks.erase(actor_id);
|
|
}
|
|
|
|
/**
|
|
* Dispatch a task to an actor if possible.
|
|
*
|
|
* @param state The state of the local scheduler.
|
|
* @param algorithm_state The state of the scheduling algorithm.
|
|
* @param actor_id The ID of the actor corresponding to the worker.
|
|
* @return True if a task was dispatched to the actor and false otherwise.
|
|
*/
|
|
bool dispatch_actor_task(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id) {
|
|
/* Make sure this worker actually is an actor. */
|
|
RAY_CHECK(!actor_id.is_nil());
|
|
/* Return if this actor doesn't have any pending tasks. */
|
|
if (algorithm_state->actors_with_pending_tasks.find(actor_id) ==
|
|
algorithm_state->actors_with_pending_tasks.end()) {
|
|
return false;
|
|
}
|
|
/* Make sure this actor belongs to this local scheduler. */
|
|
if (state->actor_mapping.count(actor_id) != 1) {
|
|
/* The creation notification for this actor has not yet arrived at the local
|
|
* scheduler. This should be rare. */
|
|
return false;
|
|
}
|
|
RAY_CHECK(state->actor_mapping[actor_id].local_scheduler_id ==
|
|
get_db_client_id(state->db));
|
|
|
|
/* Get the local actor entry for this actor. */
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
|
|
LocalActorInfo &entry =
|
|
algorithm_state->local_actor_infos.find(actor_id)->second;
|
|
|
|
/* There should be some queued tasks for this actor. */
|
|
RAY_CHECK(!entry.task_queue->empty());
|
|
/* If the worker is not available, we cannot assign a task to it. */
|
|
if (!entry.worker_available) {
|
|
return false;
|
|
}
|
|
|
|
/* Check whether we can execute the first task in the queue. */
|
|
auto task = entry.task_queue->begin();
|
|
TaskSpec *spec = task->Spec();
|
|
ActorHandleID next_task_handle_id = TaskSpec_actor_handle_id(spec);
|
|
/* We can only execute tasks in order of task_counter. */
|
|
if (TaskSpec_actor_counter(spec) !=
|
|
entry.task_counters[next_task_handle_id]) {
|
|
return false;
|
|
}
|
|
|
|
/* If there are not enough resources available, we cannot assign the task. */
|
|
RAY_CHECK(0 == TaskSpec_get_required_resource(spec, "GPU"));
|
|
if (!check_dynamic_resources(state, TaskSpec_get_required_resources(spec))) {
|
|
return false;
|
|
}
|
|
|
|
/* Update the task's execution dependencies to reflect the actual execution
|
|
* order to support deterministic reconstruction. */
|
|
/* NOTE(swang): The update of an actor task's execution dependencies is
|
|
* performed asynchronously. This means that if this local scheduler dies, we
|
|
* may lose updates that are in flight to the task table. We only guarantee
|
|
* deterministic reconstruction ordering for tasks whose updates are
|
|
* reflected in the task table. */
|
|
std::vector<ObjectID> ordered_execution_dependencies;
|
|
ordered_execution_dependencies.push_back(entry.execution_dependency);
|
|
task->SetExecutionDependencies(ordered_execution_dependencies);
|
|
|
|
/* Assign the first task in the task queue to the worker and mark the worker
|
|
* as unavailable. */
|
|
assign_task_to_worker(state, *task, entry.worker);
|
|
entry.execution_dependency = TaskSpec_actor_dummy_object(spec);
|
|
entry.worker_available = false;
|
|
/* Extend the frontier to include the assigned task. */
|
|
entry.task_counters[next_task_handle_id] += 1;
|
|
entry.frontier_dependencies[next_task_handle_id] = entry.execution_dependency;
|
|
|
|
/* Remove the task from the actor's task queue. */
|
|
entry.task_queue->erase(task);
|
|
/* If there are no more tasks in the queue, then indicate that the actor has
|
|
* no tasks. */
|
|
if (entry.task_queue->empty()) {
|
|
algorithm_state->actors_with_pending_tasks.erase(actor_id);
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void handle_convert_worker_to_actor(
|
|
LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
const ActorID &actor_id,
|
|
const ObjectID &initial_execution_dependency,
|
|
LocalSchedulerClient *worker) {
|
|
if (algorithm_state->local_actor_infos.count(actor_id) == 0) {
|
|
create_actor(algorithm_state, actor_id, initial_execution_dependency,
|
|
worker);
|
|
} else {
|
|
/* In this case, the LocalActorInfo struct was already been created by the
|
|
* first call to add_task_to_actor_queue. However, the worker field was not
|
|
* filled out, so fill out the correct worker field now. */
|
|
algorithm_state->local_actor_infos[actor_id].worker = worker;
|
|
}
|
|
/* Increment the task counter for the creator's handle to account for the
|
|
* actor creation task. */
|
|
auto &task_counters =
|
|
algorithm_state->local_actor_infos[actor_id].task_counters;
|
|
RAY_CHECK(task_counters[ActorHandleID::nil()] == 0);
|
|
task_counters[ActorHandleID::nil()]++;
|
|
}
|
|
|
|
/**
|
|
* Finishes a killed task by inserting dummy objects for each of its returns.
|
|
*/
|
|
void finish_killed_task(LocalSchedulerState *state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
TaskSpec *spec = execution_spec.Spec();
|
|
int64_t num_returns = TaskSpec_num_returns(spec);
|
|
for (int i = 0; i < num_returns; i++) {
|
|
ObjectID object_id = TaskSpec_return(spec, i);
|
|
std::shared_ptr<Buffer> data;
|
|
// TODO(ekl): this writes an invalid arrow object, which is sufficient to
|
|
// signal that the worker failed, but it would be nice to return more
|
|
// detailed failure metadata in the future.
|
|
arrow::Status status =
|
|
state->plasma_conn->Create(object_id.to_plasma_id(), 1, NULL, 0, &data);
|
|
if (!status.IsPlasmaObjectExists()) {
|
|
ARROW_CHECK_OK(status);
|
|
ARROW_CHECK_OK(state->plasma_conn->Seal(object_id.to_plasma_id()));
|
|
}
|
|
}
|
|
/* Mark the task as done. */
|
|
if (state->db != NULL) {
|
|
Task *task = Task_alloc(execution_spec, TaskStatus::DONE,
|
|
get_db_client_id(state->db));
|
|
#if !RAY_USE_NEW_GCS
|
|
// In most cases, task_table_update would be appropriate, however, it is
|
|
// possible in some cases that the task has not yet been added to the task
|
|
// table (e.g., if it is an actor task that is queued locally because the
|
|
// actor has not been created yet).
|
|
task_table_add_task(state->db, task, NULL, NULL, NULL);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Insert a task queue entry into an actor's dispatch queue. The task is
|
|
* inserted in sorted order by task counter. If this is the first task
|
|
* scheduled to this actor and the worker process has not yet connected, then
|
|
* this also creates a LocalActorInfo entry for the actor.
|
|
*
|
|
* @param state The state of the local scheduler.
|
|
* @param algorithm_state The state of the scheduling algorithm.
|
|
* @param task_entry The task queue entry to add to the actor's queue.
|
|
* @return Void.
|
|
*/
|
|
void insert_actor_task_queue(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec task_entry) {
|
|
TaskSpec *spec = task_entry.Spec();
|
|
/* Get the local actor entry for this actor. */
|
|
ActorID actor_id = TaskSpec_actor_id(spec);
|
|
ActorHandleID task_handle_id = TaskSpec_actor_handle_id(spec);
|
|
int64_t task_counter = TaskSpec_actor_counter(spec);
|
|
|
|
/* Fail the task immediately; it's destined for a dead actor. */
|
|
if (state->removed_actors.find(actor_id) != state->removed_actors.end()) {
|
|
finish_killed_task(state, task_entry);
|
|
return;
|
|
}
|
|
|
|
LocalActorInfo &entry =
|
|
algorithm_state->local_actor_infos.find(actor_id)->second;
|
|
if (entry.task_counters.count(task_handle_id) == 0) {
|
|
entry.task_counters[task_handle_id] = 0;
|
|
}
|
|
/* Extend the frontier to include the new handle. */
|
|
if (entry.frontier_dependencies.count(task_handle_id) == 0) {
|
|
RAY_CHECK(task_entry.ExecutionDependencies().size() == 1);
|
|
entry.frontier_dependencies[task_handle_id] =
|
|
task_entry.ExecutionDependencies()[1];
|
|
}
|
|
|
|
/* As a sanity check, the counter of the new task should be greater than the
|
|
* number of tasks that have executed on this actor so far (since we are
|
|
* guaranteeing in-order execution of the tasks on the actor). TODO(rkn): This
|
|
* check will fail if the fault-tolerance mechanism resubmits a task on an
|
|
* actor. */
|
|
if (task_counter < entry.task_counters[task_handle_id]) {
|
|
RAY_LOG(INFO) << "A task that has already been executed has been "
|
|
<< "resubmitted, so we are ignoring it. This should only "
|
|
<< "happen during reconstruction.";
|
|
return;
|
|
}
|
|
|
|
/* Insert the task spec to the actor's task queue in sorted order, per actor
|
|
* handle ID. Find the first task in the queue with a counter greater than
|
|
* the submitted task's and the same handle ID. */
|
|
auto it = entry.task_queue->begin();
|
|
for (; it != entry.task_queue->end(); it++) {
|
|
TaskSpec *pending_task_spec = it->Spec();
|
|
/* Skip tasks submitted by a different handle. */
|
|
if (!(task_handle_id == TaskSpec_actor_handle_id(pending_task_spec))) {
|
|
continue;
|
|
}
|
|
/* A duplicate task submitted by the same handle. */
|
|
if (task_counter == TaskSpec_actor_counter(pending_task_spec)) {
|
|
RAY_LOG(INFO) << "A task was resubmitted, so we are ignoring it. This "
|
|
<< "should only happen during reconstruction.";
|
|
return;
|
|
}
|
|
/* We found a task with the same handle ID and a greater task counter. */
|
|
if (task_counter < TaskSpec_actor_counter(pending_task_spec)) {
|
|
break;
|
|
}
|
|
}
|
|
entry.task_queue->insert(it, std::move(task_entry));
|
|
|
|
/* Record the fact that this actor has a task waiting to execute. */
|
|
algorithm_state->actors_with_pending_tasks.insert(actor_id);
|
|
}
|
|
|
|
/**
|
|
* Queue a task to be dispatched for an actor. Update the task table for the
|
|
* queued task. TODO(rkn): Should we also update the task table in the case
|
|
* where the tasks are cached locally?
|
|
*
|
|
* @param state The state of the local scheduler.
|
|
* @param algorithm_state The state of the scheduling algorithm.
|
|
* @param spec The task spec to add.
|
|
* @param from_global_scheduler True if the task was assigned to this local
|
|
* scheduler by the global scheduler and false if it was submitted
|
|
* locally by a worker.
|
|
* @return Void.
|
|
*/
|
|
void queue_actor_task(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
bool from_global_scheduler) {
|
|
TaskSpec *spec = execution_spec.Spec();
|
|
ActorID actor_id = TaskSpec_actor_id(spec);
|
|
RAY_CHECK(!actor_id.is_nil());
|
|
|
|
/* Update the task table. */
|
|
if (state->db != NULL) {
|
|
Task *task = Task_alloc(execution_spec, TaskStatus::QUEUED,
|
|
get_db_client_id(state->db));
|
|
if (from_global_scheduler) {
|
|
/* If the task is from the global scheduler, it's already been added to
|
|
* the task table, so just update the entry. */
|
|
#if !RAY_USE_NEW_GCS
|
|
task_table_update(state->db, task, NULL, NULL, NULL);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
} else {
|
|
/* Otherwise, this is the first time the task has been seen in the
|
|
* system (unless it's a resubmission of a previous task), so add the
|
|
* entry. */
|
|
#if !RAY_USE_NEW_GCS
|
|
task_table_add_task(state->db, task, NULL, NULL, NULL);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
}
|
|
}
|
|
|
|
// Create a new task queue entry. This must come after the above block because
|
|
// insert_actor_task_queue may call task_table_update internally, which must
|
|
// come after the prior call to task_table_add_task.
|
|
TaskExecutionSpec copy = TaskExecutionSpec(&execution_spec);
|
|
insert_actor_task_queue(state, algorithm_state, std::move(copy));
|
|
}
|
|
|
|
/**
|
|
* Fetch a queued task's missing object dependency. The fetch request will be
|
|
* retried every local_scheduler_fetch_timeout_milliseconds until the object is
|
|
* available locally.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param task_entry_it A reference to the task entry in the waiting queue.
|
|
* @param obj_id The ID of the object that the task is dependent on.
|
|
* @param request_transfer Whether to request a transfer of this object from
|
|
* other plasma managers. This should be set to false for execution
|
|
* dependencies, which should be fulfilled by executing the
|
|
* corresponding task locally.
|
|
* @returns Void.
|
|
*/
|
|
void fetch_missing_dependency(
|
|
LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
std::list<TaskExecutionSpec>::iterator task_entry_it,
|
|
plasma::ObjectID obj_id,
|
|
bool request_transfer) {
|
|
if (algorithm_state->remote_objects.count(obj_id) == 0) {
|
|
/* We weren't actively fetching this object. Try the fetch once
|
|
* immediately. */
|
|
if (state->plasma_conn->get_manager_fd() != -1) {
|
|
auto arrow_status = state->plasma_conn->Fetch(1, &obj_id);
|
|
if (!arrow_status.ok()) {
|
|
LocalSchedulerState_free(state);
|
|
/* TODO(swang): Local scheduler should also exit even if there are no
|
|
* pending fetches. This could be done by subscribing to the db_client
|
|
* table, or pinging the plasma manager in the heartbeat handler. */
|
|
RAY_LOG(FATAL) << "Lost connection to the plasma manager, local "
|
|
<< "scheduler is exiting. Error: "
|
|
<< arrow_status.ToString();
|
|
}
|
|
}
|
|
/* Create an entry and add it to the list of active fetch requests to
|
|
* ensure that the fetch actually happens. The entry will be moved to the
|
|
* hash table of locally available objects in handle_object_available when
|
|
* the object becomes available locally. It will get freed if the object is
|
|
* subsequently removed locally. */
|
|
ObjectEntry entry;
|
|
entry.request_transfer = request_transfer;
|
|
algorithm_state->remote_objects[obj_id] = entry;
|
|
}
|
|
algorithm_state->remote_objects[obj_id].dependent_tasks.push_back(
|
|
task_entry_it);
|
|
}
|
|
|
|
/**
|
|
* Fetch a queued task's missing object dependencies. The fetch requests will
|
|
* be retried every local_scheduler_fetch_timeout_milliseconds until all
|
|
* objects are available locally.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param task_entry_it A reference to the task entry in the waiting queue.
|
|
* @returns Void.
|
|
*/
|
|
void fetch_missing_dependencies(
|
|
LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
std::list<TaskExecutionSpec>::iterator task_entry_it) {
|
|
int64_t num_dependencies = task_entry_it->NumDependencies();
|
|
int num_missing_dependencies = 0;
|
|
for (int64_t i = 0; i < num_dependencies; ++i) {
|
|
int count = task_entry_it->DependencyIdCount(i);
|
|
for (int j = 0; j < count; ++j) {
|
|
ObjectID obj_id = task_entry_it->DependencyId(i, j);
|
|
/* If the entry is not yet available locally, record the dependency. */
|
|
if (algorithm_state->local_objects.count(obj_id) == 0) {
|
|
/* Do not request a transfer from other plasma managers if this is an
|
|
* execution dependency. */
|
|
bool request_transfer = task_entry_it->IsStaticDependency(i);
|
|
fetch_missing_dependency(state, algorithm_state, task_entry_it,
|
|
obj_id.to_plasma_id(), request_transfer);
|
|
++num_missing_dependencies;
|
|
}
|
|
}
|
|
}
|
|
RAY_CHECK(num_missing_dependencies > 0);
|
|
}
|
|
|
|
/**
|
|
* Clear a queued task's missing object dependencies. This is the inverse of
|
|
* fetch_missing_dependencies.
|
|
* TODO(swang): Test this function.
|
|
*
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param task_entry_it A reference to the task entry in the waiting queue.
|
|
* @returns Void.
|
|
*/
|
|
void clear_missing_dependencies(
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
std::list<TaskExecutionSpec>::iterator task_entry_it) {
|
|
int64_t num_dependencies = task_entry_it->NumDependencies();
|
|
for (int64_t i = 0; i < num_dependencies; ++i) {
|
|
int count = task_entry_it->DependencyIdCount(i);
|
|
for (int j = 0; j < count; ++j) {
|
|
ObjectID obj_id = task_entry_it->DependencyId(i, j);
|
|
/* If this object dependency is missing, remove this task from the
|
|
* object's list of dependent tasks. */
|
|
auto entry = algorithm_state->remote_objects.find(obj_id);
|
|
if (entry != algorithm_state->remote_objects.end()) {
|
|
/* Find and remove the given task. */
|
|
auto &dependent_tasks = entry->second.dependent_tasks;
|
|
for (auto dependent_task_it = dependent_tasks.begin();
|
|
dependent_task_it != dependent_tasks.end();) {
|
|
if (*dependent_task_it == task_entry_it) {
|
|
dependent_task_it = dependent_tasks.erase(dependent_task_it);
|
|
} else {
|
|
dependent_task_it++;
|
|
}
|
|
}
|
|
/* If the missing object dependency has no more dependent tasks, then
|
|
* remove it. */
|
|
if (dependent_tasks.empty()) {
|
|
algorithm_state->remote_objects.erase(entry);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if all of the remote object arguments for a task are available in the
|
|
* local object store.
|
|
*
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param task Task specification of the task to check.
|
|
* @return bool This returns true if all of the remote object arguments for the
|
|
* task are present in the local object store, otherwise it returns
|
|
* false.
|
|
*/
|
|
bool can_run(SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &task) {
|
|
int64_t num_dependencies = task.NumDependencies();
|
|
for (int i = 0; i < num_dependencies; ++i) {
|
|
int count = task.DependencyIdCount(i);
|
|
for (int j = 0; j < count; ++j) {
|
|
ObjectID obj_id = task.DependencyId(i, j);
|
|
if (algorithm_state->local_objects.count(obj_id) == 0) {
|
|
/* The object is not present locally, so this task cannot be scheduled
|
|
* right now. */
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
return true;
|
|
}
|
|
|
|
bool object_locally_available(SchedulingAlgorithmState *algorithm_state,
|
|
ObjectID object_id) {
|
|
return algorithm_state->local_objects.count(object_id) == 1;
|
|
}
|
|
|
|
/* TODO(swang): This method is not covered by any valgrind tests. */
|
|
int fetch_object_timeout_handler(event_loop *loop, timer_id id, void *context) {
|
|
int64_t start_time = current_time_ms();
|
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
|
/* Only try the fetches if we are connected to the object store manager. */
|
|
if (state->plasma_conn->get_manager_fd() == -1) {
|
|
RAY_LOG(INFO)
|
|
<< "Local scheduler is not connected to a object store manager";
|
|
return RayConfig::instance().local_scheduler_fetch_timeout_milliseconds();
|
|
}
|
|
|
|
std::vector<ObjectID> object_id_vec;
|
|
for (auto const &entry : state->algorithm_state->remote_objects) {
|
|
if (entry.second.request_transfer) {
|
|
object_id_vec.push_back(entry.first);
|
|
}
|
|
}
|
|
|
|
ObjectID *object_ids = object_id_vec.data();
|
|
int64_t num_object_ids = object_id_vec.size();
|
|
|
|
/* Divide very large fetch requests into smaller fetch requests so that a
|
|
* single fetch request doesn't block the plasma manager for a long time. */
|
|
for (int64_t j = 0; j < num_object_ids;
|
|
j += RayConfig::instance().local_scheduler_fetch_request_size()) {
|
|
int num_objects_in_request =
|
|
std::min(
|
|
num_object_ids,
|
|
j + RayConfig::instance().local_scheduler_fetch_request_size()) -
|
|
j;
|
|
auto arrow_status = state->plasma_conn->Fetch(
|
|
num_objects_in_request,
|
|
reinterpret_cast<plasma::ObjectID *>(&object_ids[j]));
|
|
if (!arrow_status.ok()) {
|
|
LocalSchedulerState_free(state);
|
|
RAY_LOG(FATAL) << "Lost connection to the plasma manager, local "
|
|
<< "scheduler is exiting. Error: "
|
|
<< arrow_status.ToString();
|
|
}
|
|
}
|
|
|
|
/* Print a warning if this method took too long. */
|
|
int64_t end_time = current_time_ms();
|
|
if (end_time - start_time >
|
|
RayConfig::instance().max_time_for_handler_milliseconds()) {
|
|
RAY_LOG(WARNING) << "fetch_object_timeout_handler took "
|
|
<< end_time - start_time << " milliseconds.";
|
|
}
|
|
|
|
/* Wait at least local_scheduler_fetch_timeout_milliseconds before running
|
|
* this timeout handler again. But if we're waiting for a large number of
|
|
* objects, wait longer (e.g., 10 seconds for one million objects) so that we
|
|
* don't overwhelm the plasma manager. */
|
|
return std::max(
|
|
RayConfig::instance().local_scheduler_fetch_timeout_milliseconds(),
|
|
int64_t(0.01 * num_object_ids));
|
|
}
|
|
|
|
/* TODO(swang): This method is not covered by any valgrind tests. */
|
|
int reconstruct_object_timeout_handler(event_loop *loop,
|
|
timer_id id,
|
|
void *context) {
|
|
int64_t start_time = current_time_ms();
|
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
|
|
|
/* This vector is used to track which object IDs to reconstruct next. If the
|
|
* vector is empty, we repopulate it with all of the keys of the remote object
|
|
* table. During every pass through this handler, we call reconstruct on up to
|
|
* max_num_to_reconstruct elements of the vector (after first checking that
|
|
* the object IDs are still missing). */
|
|
static std::vector<ObjectID> object_ids_to_reconstruct;
|
|
|
|
/* If the set is empty, repopulate it. */
|
|
if (object_ids_to_reconstruct.size() == 0) {
|
|
for (auto const &entry : state->algorithm_state->remote_objects) {
|
|
object_ids_to_reconstruct.push_back(entry.first);
|
|
}
|
|
}
|
|
|
|
int64_t num_reconstructed = 0;
|
|
for (size_t i = 0; i < object_ids_to_reconstruct.size(); i++) {
|
|
ObjectID object_id = object_ids_to_reconstruct[i];
|
|
/* Only call reconstruct if we are still missing the object. */
|
|
if (state->algorithm_state->remote_objects.find(object_id) !=
|
|
state->algorithm_state->remote_objects.end()) {
|
|
reconstruct_object(state, object_id);
|
|
}
|
|
num_reconstructed++;
|
|
if (num_reconstructed == RayConfig::instance().max_num_to_reconstruct()) {
|
|
break;
|
|
}
|
|
}
|
|
object_ids_to_reconstruct.erase(
|
|
object_ids_to_reconstruct.begin(),
|
|
object_ids_to_reconstruct.begin() + num_reconstructed);
|
|
|
|
/* Print a warning if this method took too long. */
|
|
int64_t end_time = current_time_ms();
|
|
if (end_time - start_time >
|
|
RayConfig::instance().max_time_for_handler_milliseconds()) {
|
|
RAY_LOG(WARNING) << "reconstruct_object_timeout_handler took "
|
|
<< end_time - start_time << " milliseconds.";
|
|
}
|
|
|
|
return RayConfig::instance()
|
|
.local_scheduler_reconstruction_timeout_milliseconds();
|
|
}
|
|
|
|
int rerun_actor_creation_tasks_timeout_handler(event_loop *loop,
|
|
timer_id id,
|
|
void *context) {
|
|
int64_t start_time = current_time_ms();
|
|
|
|
LocalSchedulerState *state = (LocalSchedulerState *) context;
|
|
|
|
// Create a set of the dummy object IDs for the actor creation tasks to
|
|
// reconstruct.
|
|
std::unordered_set<ObjectID> actor_dummy_objects;
|
|
for (auto const &execution_spec :
|
|
state->algorithm_state->cached_submitted_actor_tasks) {
|
|
ObjectID actor_creation_dummy_object_id =
|
|
TaskSpec_actor_creation_dummy_object_id(execution_spec.Spec());
|
|
actor_dummy_objects.insert(actor_creation_dummy_object_id);
|
|
}
|
|
|
|
// Issue reconstruct calls.
|
|
for (auto const &object_id : actor_dummy_objects) {
|
|
reconstruct_object(state, object_id);
|
|
}
|
|
|
|
// Print a warning if this method took too long.
|
|
int64_t end_time = current_time_ms();
|
|
if (end_time - start_time >
|
|
RayConfig::instance().max_time_for_handler_milliseconds()) {
|
|
RAY_LOG(WARNING) << "reconstruct_object_timeout_handler took "
|
|
<< end_time - start_time << " milliseconds.";
|
|
}
|
|
|
|
return RayConfig::instance()
|
|
.local_scheduler_reconstruction_timeout_milliseconds();
|
|
}
|
|
|
|
/**
|
|
* Return true if there are still some resources available and false otherwise.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @return True if there are still some resources and false if there are not.
|
|
*/
|
|
bool resources_available(LocalSchedulerState *state) {
|
|
bool resources_available = false;
|
|
for (auto const &resource_pair : state->dynamic_resources) {
|
|
if (resource_pair.second > 0) {
|
|
resources_available = true;
|
|
}
|
|
}
|
|
return resources_available;
|
|
}
|
|
|
|
void spillback_tasks_handler(LocalSchedulerState *state) {
|
|
SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
|
|
|
|
int64_t num_to_spillback = std::min(
|
|
static_cast<int64_t>(algorithm_state->dispatch_task_queue->size()),
|
|
RayConfig::instance().max_tasks_to_spillback());
|
|
|
|
auto it = algorithm_state->dispatch_task_queue->end();
|
|
for (int64_t i = 0; i < num_to_spillback; i++) {
|
|
it--;
|
|
}
|
|
|
|
for (int64_t i = 0; i < num_to_spillback; i++) {
|
|
it->IncrementSpillbackCount();
|
|
// If an actor hasn't been created for a while, push a warning to the
|
|
// driver.
|
|
if (it->SpillbackCount() %
|
|
RayConfig::instance().actor_creation_num_spillbacks_warning() ==
|
|
0) {
|
|
TaskSpec *spec = it->Spec();
|
|
if (TaskSpec_is_actor_creation_task(spec)) {
|
|
std::ostringstream error_message;
|
|
error_message << "The actor with ID "
|
|
<< TaskSpec_actor_creation_id(spec) << " is taking a "
|
|
<< "while to be created. It is possible that the "
|
|
<< "cluster does not have enough resources to place this "
|
|
<< "actor (this may be normal while an autoscaling "
|
|
<< "is scaling up). Consider reducing the number of "
|
|
<< "actors created, or "
|
|
<< "increasing the number of slots available by using "
|
|
<< "the --num-cpus, --num-gpus, and --resources flags. "
|
|
<< "The actor creation task is requesting ";
|
|
for (auto const &resource_pair :
|
|
TaskSpec_get_required_resources(spec)) {
|
|
error_message << resource_pair.second << " " << resource_pair.first
|
|
<< " ";
|
|
}
|
|
push_error(state->db, TaskSpec_driver_id(spec),
|
|
ErrorIndex::ACTOR_NOT_CREATED, error_message.str());
|
|
}
|
|
}
|
|
|
|
give_task_to_global_scheduler(state, algorithm_state, *it);
|
|
// Dequeue the task.
|
|
it = algorithm_state->dispatch_task_queue->erase(it);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Assign as many tasks from the dispatch queue as possible.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @return Void.
|
|
*/
|
|
void dispatch_tasks(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state) {
|
|
/* Assign as many tasks as we can, while there are workers available. */
|
|
for (auto it = algorithm_state->dispatch_task_queue->begin();
|
|
it != algorithm_state->dispatch_task_queue->end();) {
|
|
TaskSpec *spec = it->Spec();
|
|
/* If there is a task to assign, but there are no more available workers in
|
|
* the worker pool, then exit. Ensure that there will be an available
|
|
* worker during a future invocation of dispatch_tasks. */
|
|
if (algorithm_state->available_workers.size() == 0) {
|
|
if (state->child_pids.size() == 0) {
|
|
/* If there are no workers, including those pending PID registration,
|
|
* then we must start a new one to replenish the worker pool. */
|
|
start_worker(state);
|
|
}
|
|
return;
|
|
}
|
|
|
|
/* Terminate early if there are no more resources available. */
|
|
if (!resources_available(state)) {
|
|
return;
|
|
}
|
|
|
|
/* Skip to the next task if this task cannot currently be satisfied. */
|
|
if (!check_dynamic_resources(state,
|
|
TaskSpec_get_required_resources(spec))) {
|
|
/* This task could not be satisfied -- proceed to the next task. */
|
|
++it;
|
|
continue;
|
|
}
|
|
|
|
/* Dispatch this task to an available worker and dequeue the task. */
|
|
RAY_LOG(DEBUG) << "Dispatching task";
|
|
/* Get the last available worker in the available worker queue. */
|
|
LocalSchedulerClient *worker = algorithm_state->available_workers.back();
|
|
/* Tell the available worker to execute the task. */
|
|
assign_task_to_worker(state, *it, worker);
|
|
/* Remove the worker from the available queue, and add it to the executing
|
|
* workers. */
|
|
algorithm_state->available_workers.pop_back();
|
|
algorithm_state->executing_workers.push_back(worker);
|
|
print_resource_info(state, spec);
|
|
/* Dequeue the task. */
|
|
it = algorithm_state->dispatch_task_queue->erase(it);
|
|
} /* End for each task in the dispatch queue. */
|
|
}
|
|
|
|
/**
|
|
* Attempt to dispatch both regular tasks and actor tasks.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @return Void.
|
|
*/
|
|
void dispatch_all_tasks(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state) {
|
|
/* First attempt to dispatch regular tasks. */
|
|
dispatch_tasks(state, algorithm_state);
|
|
|
|
/* Attempt to dispatch actor tasks. */
|
|
auto it = algorithm_state->actors_with_pending_tasks.begin();
|
|
while (it != algorithm_state->actors_with_pending_tasks.end()) {
|
|
// We cannot short-circuit and exit here if there are no resources
|
|
// available because actor methods may require 0 CPUs.
|
|
|
|
/* We increment the iterator ahead of time because the call to
|
|
* dispatch_actor_task may invalidate the current iterator. */
|
|
ActorID actor_id = *it;
|
|
it++;
|
|
/* Dispatch tasks for the current actor. */
|
|
dispatch_actor_task(state, algorithm_state, actor_id);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* A helper function to allocate a queue entry for a task specification and
|
|
* push it onto a generic queue.
|
|
*
|
|
* @param state The state of the local scheduler.
|
|
* @param task_queue A pointer to a task queue. NOTE: Because we are using
|
|
* utlist.h, we must pass in a pointer to the queue we want to append
|
|
* to. If we passed in the queue itself and the queue was empty, this
|
|
* would append the task to a queue that we don't have a reference to.
|
|
* @param task_entry A pointer to the task entry to queue.
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
* @return A reference to the entry in the queue that was pushed.
|
|
*/
|
|
std::list<TaskExecutionSpec>::iterator queue_task(
|
|
LocalSchedulerState *state,
|
|
std::list<TaskExecutionSpec> *task_queue,
|
|
TaskExecutionSpec &task_entry,
|
|
bool from_global_scheduler) {
|
|
/* The task has been added to a local scheduler queue. Write the entry in the
|
|
* task table to notify others that we have queued it. */
|
|
if (state->db != NULL) {
|
|
Task *task =
|
|
Task_alloc(task_entry, TaskStatus::QUEUED, get_db_client_id(state->db));
|
|
#if !RAY_USE_NEW_GCS
|
|
if (from_global_scheduler) {
|
|
/* If the task is from the global scheduler, it's already been added to
|
|
* the task table, so just update the entry. */
|
|
task_table_update(state->db, task, NULL, NULL, NULL);
|
|
} else {
|
|
/* Otherwise, this is the first time the task has been seen in the system
|
|
* (unless it's a resubmission of a previous task), so add the entry. */
|
|
task_table_add_task(state->db, task, NULL, NULL, NULL);
|
|
}
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
}
|
|
|
|
/* Copy the spec and add it to the task queue. The allocated spec will be
|
|
* freed when it is assigned to a worker. */
|
|
TaskExecutionSpec copy = TaskExecutionSpec(&task_entry);
|
|
task_queue->push_back(std::move(copy));
|
|
/* Since we just queued the task, we can get a reference to it by going to
|
|
* the last element in the queue. */
|
|
auto it = task_queue->end();
|
|
--it;
|
|
|
|
return it;
|
|
}
|
|
|
|
/**
|
|
* Queue a task whose dependencies are missing. When the task's object
|
|
* dependencies become available, the task will be moved to the dispatch queue.
|
|
* If we have a connection to a plasma manager, begin trying to fetch the
|
|
* dependencies.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param spec The task specification to queue.
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
* @return Void.
|
|
*/
|
|
void queue_waiting_task(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
bool from_global_scheduler) {
|
|
/* For actor tasks, do not queue tasks that have already been executed. */
|
|
auto spec = execution_spec.Spec();
|
|
if (!TaskSpec_actor_id(spec).is_nil()) {
|
|
auto entry =
|
|
algorithm_state->local_actor_infos.find(TaskSpec_actor_id(spec));
|
|
if (entry != algorithm_state->local_actor_infos.end()) {
|
|
/* Find the highest task counter with the same handle ID as the task to
|
|
* queue. */
|
|
auto &task_counters = entry->second.task_counters;
|
|
auto task_counter = task_counters.find(TaskSpec_actor_handle_id(spec));
|
|
if (task_counter != task_counters.end() &&
|
|
TaskSpec_actor_counter(spec) < task_counter->second) {
|
|
/* If the task to queue has a lower task counter, do not queue it. */
|
|
RAY_LOG(INFO) << "A task that has already been executed has been "
|
|
<< "resubmitted, so we are ignoring it. This should only "
|
|
<< "happen during reconstruction.";
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
RAY_LOG(DEBUG) << "Queueing task in waiting queue";
|
|
auto it = queue_task(state, algorithm_state->waiting_task_queue,
|
|
execution_spec, from_global_scheduler);
|
|
fetch_missing_dependencies(state, algorithm_state, it);
|
|
}
|
|
|
|
/**
|
|
* Queue a task whose dependencies are ready. When the task reaches the front
|
|
* of the dispatch queue and workers are available, it will be assigned.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param spec The task specification to queue.
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
* @return Void.
|
|
*/
|
|
void queue_dispatch_task(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
bool from_global_scheduler) {
|
|
RAY_LOG(DEBUG) << "Queueing task in dispatch queue";
|
|
TaskSpec *spec = execution_spec.Spec();
|
|
if (TaskSpec_is_actor_task(spec)) {
|
|
queue_actor_task(state, algorithm_state, execution_spec,
|
|
from_global_scheduler);
|
|
} else {
|
|
queue_task(state, algorithm_state->dispatch_task_queue, execution_spec,
|
|
from_global_scheduler);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Add the task to the proper local scheduler queue. This assumes that the
|
|
* scheduling decision to place the task on this node has already been made,
|
|
* whether locally or by the global scheduler.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param spec The task specification to queue.
|
|
* @param from_global_scheduler Whether or not the task was from a global
|
|
* scheduler. If false, the task was submitted by a worker.
|
|
* @return Void.
|
|
*/
|
|
void queue_task_locally(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
bool from_global_scheduler) {
|
|
if (can_run(algorithm_state, execution_spec)) {
|
|
/* Dependencies are ready, so push the task to the dispatch queue. */
|
|
queue_dispatch_task(state, algorithm_state, execution_spec,
|
|
from_global_scheduler);
|
|
} else {
|
|
/* Dependencies are not ready, so push the task to the waiting queue. */
|
|
queue_waiting_task(state, algorithm_state, execution_spec,
|
|
from_global_scheduler);
|
|
}
|
|
}
|
|
|
|
void give_task_to_local_scheduler_retry(UniqueID id,
|
|
void *user_context,
|
|
void *user_data) {
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
|
Task *task = (Task *) user_data;
|
|
RAY_CHECK(Task_state(task) == TaskStatus::SCHEDULED);
|
|
|
|
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
|
TaskSpec *spec = execution_spec->Spec();
|
|
RAY_CHECK(TaskSpec_is_actor_task(spec));
|
|
|
|
ActorID actor_id = TaskSpec_actor_id(spec);
|
|
|
|
if (state->actor_mapping.count(actor_id) == 0) {
|
|
// Process the actor task submission again. This will cache the task
|
|
// locally until a new actor creation notification is broadcast. We will
|
|
// attempt to reissue the actor creation tasks for all cached actor tasks
|
|
// in rerun_actor_creation_tasks_timeout_handler.
|
|
handle_actor_task_submitted(state, state->algorithm_state, *execution_spec);
|
|
return;
|
|
}
|
|
|
|
DBClientID remote_local_scheduler_id =
|
|
state->actor_mapping[actor_id].local_scheduler_id;
|
|
|
|
// TODO(rkn): db_client_table_cache_get is a blocking call, is this a
|
|
// performance issue?
|
|
DBClient remote_local_scheduler =
|
|
db_client_table_cache_get(state->db, remote_local_scheduler_id);
|
|
|
|
// Check if the local scheduler that we're assigning this task to is still
|
|
// alive.
|
|
if (remote_local_scheduler.is_alive) {
|
|
// The local scheduler is still alive, which means that perhaps it hasn't
|
|
// subscribed to the appropriate channel yet, so retrying should suffice.
|
|
// This should be rare.
|
|
give_task_to_local_scheduler(
|
|
state, state->algorithm_state, *execution_spec,
|
|
state->actor_mapping[actor_id].local_scheduler_id);
|
|
} else {
|
|
// The local scheduler is dead, so we will need to recreate the actor by
|
|
// invoking reconstruction.
|
|
RAY_LOG(INFO) << "Local scheduler " << remote_local_scheduler_id
|
|
<< " that was running actor " << actor_id << " died.";
|
|
RAY_CHECK(state->actor_mapping.count(actor_id) == 1);
|
|
// Update the actor mapping.
|
|
state->actor_mapping.erase(actor_id);
|
|
// Process the actor task submission again. This will cache the task
|
|
// locally until a new actor creation notification is broadcast. We will
|
|
// attempt to reissue the actor creation tasks for all cached actor tasks
|
|
// in rerun_actor_creation_tasks_timeout_handler.
|
|
handle_actor_task_submitted(state, state->algorithm_state, *execution_spec);
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Give a task directly to another local scheduler. This is currently only used
|
|
* for assigning actor tasks to the local scheduler responsible for that actor.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param spec The task specification to schedule.
|
|
* @param local_scheduler_id The ID of the local scheduler to give the task to.
|
|
* @return Void.
|
|
*/
|
|
void give_task_to_local_scheduler(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec,
|
|
DBClientID local_scheduler_id) {
|
|
if (local_scheduler_id == get_db_client_id(state->db)) {
|
|
RAY_LOG(WARNING) << "Local scheduler is trying to assign a task to itself.";
|
|
}
|
|
RAY_CHECK(state->db != NULL);
|
|
/* Assign the task to the relevant local scheduler. */
|
|
RAY_CHECK(state->config.global_scheduler_exists);
|
|
Task *task =
|
|
Task_alloc(execution_spec, TaskStatus::SCHEDULED, local_scheduler_id);
|
|
#if !RAY_USE_NEW_GCS
|
|
auto retryInfo = RetryInfo{
|
|
.num_retries = 0, // This value is unused.
|
|
.timeout = 0, // This value is unused.
|
|
.fail_callback = give_task_to_local_scheduler_retry,
|
|
};
|
|
|
|
task_table_add_task(state->db, task, &retryInfo, NULL, state);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
}
|
|
|
|
void give_task_to_global_scheduler_retry(UniqueID id,
|
|
void *user_context,
|
|
void *user_data) {
|
|
LocalSchedulerState *state = (LocalSchedulerState *) user_context;
|
|
Task *task = (Task *) user_data;
|
|
RAY_CHECK(Task_state(task) == TaskStatus::WAITING);
|
|
|
|
TaskExecutionSpec *execution_spec = Task_task_execution_spec(task);
|
|
TaskSpec *spec = execution_spec->Spec();
|
|
RAY_CHECK(!TaskSpec_is_actor_task(spec));
|
|
|
|
give_task_to_global_scheduler(state, state->algorithm_state, *execution_spec);
|
|
}
|
|
|
|
/**
|
|
* Give a task to the global scheduler to schedule.
|
|
*
|
|
* @param state The scheduler state.
|
|
* @param algorithm_state The scheduling algorithm state.
|
|
* @param spec The task specification to schedule.
|
|
* @return Void.
|
|
*/
|
|
void give_task_to_global_scheduler(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
if (state->db == NULL || !state->config.global_scheduler_exists) {
|
|
/* A global scheduler is not available, so queue the task locally. */
|
|
queue_task_locally(state, algorithm_state, execution_spec, false);
|
|
return;
|
|
}
|
|
/* Pass on the task to the global scheduler. */
|
|
RAY_CHECK(state->config.global_scheduler_exists);
|
|
Task *task = Task_alloc(execution_spec, TaskStatus::WAITING,
|
|
get_db_client_id(state->db));
|
|
#if !RAY_USE_NEW_GCS
|
|
RAY_CHECK(state->db != NULL);
|
|
auto retryInfo = RetryInfo{
|
|
.num_retries = 0, // This value is unused.
|
|
.timeout = 0, // This value is unused.
|
|
.fail_callback = give_task_to_global_scheduler_retry,
|
|
};
|
|
task_table_add_task(state->db, task, &retryInfo, NULL, state);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
}
|
|
|
|
bool resource_constraints_satisfied(LocalSchedulerState *state,
|
|
TaskSpec *spec) {
|
|
/* At the local scheduler, if required resource vector exceeds either static
|
|
* or dynamic resource vector, the resource constraint is not satisfied. */
|
|
for (auto const &resource_pair : TaskSpec_get_required_resources(spec)) {
|
|
double required_resource = resource_pair.second;
|
|
if (required_resource > state->static_resources[resource_pair.first] ||
|
|
required_resource > state->dynamic_resources[resource_pair.first]) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
if (TaskSpec_is_actor_creation_task(spec) &&
|
|
state->static_resources["CPU"] != 0) {
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
void handle_task_submitted(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
TaskSpec *spec = execution_spec.Spec();
|
|
/* TODO(atumanov): if static is satisfied and local objects ready, but dynamic
|
|
* resource is currently unavailable, then consider queueing task locally and
|
|
* recheck dynamic next time. */
|
|
|
|
// If this task's constraints are satisfied, dependencies are available
|
|
// locally, and there is an available worker, then enqueue the task in the
|
|
// dispatch queue and trigger task dispatch. Otherwise, pass the task along to
|
|
// the global scheduler if there is one.
|
|
// Note that actor creation tasks automatically go to the global scheduler.
|
|
// See https://github.com/ray-project/ray/issues/1756 for more discussion.
|
|
// This is a hack to improve actor load balancing (and to prevent the scenario
|
|
// where all actors are started locally).
|
|
if (resource_constraints_satisfied(state, spec) &&
|
|
(algorithm_state->available_workers.size() > 0) &&
|
|
can_run(algorithm_state, execution_spec) &&
|
|
!TaskSpec_is_actor_creation_task(spec)) {
|
|
queue_dispatch_task(state, algorithm_state, execution_spec, false);
|
|
} else {
|
|
/* Give the task to the global scheduler to schedule, if it exists. */
|
|
give_task_to_global_scheduler(state, algorithm_state, execution_spec);
|
|
}
|
|
|
|
/* Try to dispatch tasks, since we may have added one to the queue. */
|
|
dispatch_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_actor_task_submitted(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
TaskSpec *task_spec = execution_spec.Spec();
|
|
RAY_CHECK(TaskSpec_is_actor_task(task_spec));
|
|
ActorID actor_id = TaskSpec_actor_id(task_spec);
|
|
|
|
if (state->actor_mapping.count(actor_id) == 0) {
|
|
// Create a copy of the task to write to the task table.
|
|
Task *task = Task_alloc(
|
|
task_spec, execution_spec.SpecSize(), TaskStatus::ACTOR_CACHED,
|
|
get_db_client_id(state->db), execution_spec.ExecutionDependencies());
|
|
|
|
/* Add this task to a queue of tasks that have been submitted but the local
|
|
* scheduler doesn't know which actor is responsible for them. These tasks
|
|
* will be resubmitted (internally by the local scheduler) whenever a new
|
|
* actor notification arrives. NOTE(swang): These tasks have not yet been
|
|
* added to the task table. */
|
|
TaskExecutionSpec task_entry = TaskExecutionSpec(&execution_spec);
|
|
algorithm_state->cached_submitted_actor_tasks.push_back(
|
|
std::move(task_entry));
|
|
|
|
#if !RAY_USE_NEW_GCS
|
|
// Even if the task can't be assigned to a worker yet, we should still write
|
|
// it to the task table. TODO(rkn): There's no need to do this more than
|
|
// once, and we could run into problems if we have very large numbers of
|
|
// tasks in this cache.
|
|
task_table_add_task(state->db, task, NULL, NULL, NULL);
|
|
#else
|
|
RAY_CHECK_OK(TaskTableAdd(&state->gcs_client, task));
|
|
Task_free(task);
|
|
#endif
|
|
|
|
return;
|
|
}
|
|
|
|
if (state->actor_mapping[actor_id].local_scheduler_id ==
|
|
get_db_client_id(state->db)) {
|
|
/* This local scheduler is responsible for the actor, so handle the task
|
|
* locally. */
|
|
queue_task_locally(state, algorithm_state, execution_spec, false);
|
|
/* Attempt to dispatch tasks to this actor. */
|
|
dispatch_actor_task(state, algorithm_state, actor_id);
|
|
} else {
|
|
/* This local scheduler is not responsible for the task, so find the local
|
|
* scheduler that is responsible for this actor and assign the task directly
|
|
* to that local scheduler. */
|
|
give_task_to_local_scheduler(
|
|
state, algorithm_state, execution_spec,
|
|
state->actor_mapping[actor_id].local_scheduler_id);
|
|
}
|
|
}
|
|
|
|
void handle_actor_creation_notification(
|
|
LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id) {
|
|
int num_cached_actor_tasks =
|
|
algorithm_state->cached_submitted_actor_tasks.size();
|
|
|
|
for (int i = 0; i < num_cached_actor_tasks; ++i) {
|
|
TaskExecutionSpec &task = algorithm_state->cached_submitted_actor_tasks[i];
|
|
/* Note that handle_actor_task_submitted may append the spec to the end of
|
|
* the cached_submitted_actor_tasks array. */
|
|
handle_actor_task_submitted(state, algorithm_state, task);
|
|
}
|
|
/* Remove all the tasks that were resubmitted. This does not erase the tasks
|
|
* that were newly appended to the cached_submitted_actor_tasks array. */
|
|
auto begin = algorithm_state->cached_submitted_actor_tasks.begin();
|
|
algorithm_state->cached_submitted_actor_tasks.erase(
|
|
begin, begin + num_cached_actor_tasks);
|
|
}
|
|
|
|
void handle_task_scheduled(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
/* This callback handles tasks that were assigned to this local scheduler by
|
|
* the global scheduler, so we can safely assert that there is a connection to
|
|
* the database. */
|
|
RAY_CHECK(state->db != NULL);
|
|
RAY_CHECK(state->config.global_scheduler_exists);
|
|
|
|
// Currently, the global scheduler will never assign a task to a local
|
|
// scheduler that has 0 CPUs.
|
|
RAY_CHECK(state->static_resources["CPU"] != 0);
|
|
|
|
// Push the task to the appropriate queue.
|
|
queue_task_locally(state, algorithm_state, execution_spec, true);
|
|
dispatch_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_actor_task_scheduled(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
TaskExecutionSpec &execution_spec) {
|
|
TaskSpec *spec = execution_spec.Spec();
|
|
/* This callback handles tasks that were assigned to this local scheduler by
|
|
* the global scheduler or by other workers, so we can safely assert that
|
|
* there is a connection to the database. */
|
|
RAY_CHECK(state->db != NULL);
|
|
RAY_CHECK(state->config.global_scheduler_exists);
|
|
/* Check that the task is meant to run on an actor that this local scheduler
|
|
* is responsible for. */
|
|
RAY_CHECK(TaskSpec_is_actor_task(spec));
|
|
ActorID actor_id = TaskSpec_actor_id(spec);
|
|
if (state->actor_mapping.count(actor_id) == 1) {
|
|
RAY_CHECK(state->actor_mapping[actor_id].local_scheduler_id ==
|
|
get_db_client_id(state->db));
|
|
} else {
|
|
/* This means that an actor has been assigned to this local scheduler, and a
|
|
* task for that actor has been received by this local scheduler, but this
|
|
* local scheduler has not yet processed the notification about the actor
|
|
* creation. This may be possible though should be very uncommon. If it does
|
|
* happen, it's ok. */
|
|
RAY_LOG(INFO) << "handle_actor_task_scheduled called on local scheduler "
|
|
<< "but the corresponding actor_map_entry is not present. "
|
|
<< "This should be rare.";
|
|
}
|
|
/* Push the task to the appropriate queue. */
|
|
queue_task_locally(state, algorithm_state, execution_spec, true);
|
|
dispatch_actor_task(state, algorithm_state, actor_id);
|
|
}
|
|
|
|
void handle_worker_available(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
RAY_CHECK(worker->task_in_progress == NULL);
|
|
/* Check that the worker isn't in the pool of available workers. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->available_workers, worker));
|
|
|
|
/* Check that the worker isn't in the list of blocked workers. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
|
|
|
|
/* If the worker was executing a task, it must have finished, so remove it
|
|
* from the list of executing workers. If the worker is connecting for the
|
|
* first time, it will not be in the list of executing workers. */
|
|
remove_worker_from_vector(algorithm_state->executing_workers, worker);
|
|
/* Double check that we successfully removed the worker. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
|
|
|
|
/* Add worker to the list of available workers. */
|
|
algorithm_state->available_workers.push_back(worker);
|
|
|
|
/* Try to dispatch tasks. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_worker_removed(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
/* Make sure this is not an actor. */
|
|
RAY_CHECK(worker->actor_id.is_nil());
|
|
|
|
/* Make sure that we remove the worker at most once. */
|
|
int num_times_removed = 0;
|
|
|
|
/* Remove the worker from available workers, if it's there. */
|
|
bool removed_from_available =
|
|
remove_worker_from_vector(algorithm_state->available_workers, worker);
|
|
num_times_removed += removed_from_available;
|
|
/* Double check that we actually removed the worker. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->available_workers, worker));
|
|
|
|
/* Remove the worker from executing workers, if it's there. */
|
|
bool removed_from_executing =
|
|
remove_worker_from_vector(algorithm_state->executing_workers, worker);
|
|
num_times_removed += removed_from_executing;
|
|
/* Double check that we actually removed the worker. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
|
|
|
|
/* Remove the worker from blocked workers, if it's there. */
|
|
bool removed_from_blocked =
|
|
remove_worker_from_vector(algorithm_state->blocked_workers, worker);
|
|
num_times_removed += removed_from_blocked;
|
|
/* Double check that we actually removed the worker. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
|
|
|
|
/* Make sure we removed the worker at most once. */
|
|
RAY_CHECK(num_times_removed <= 1);
|
|
|
|
/* Attempt to dispatch some tasks because some resources may have freed up. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_actor_worker_disconnect(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker,
|
|
bool cleanup) {
|
|
/* Fail all in progress or queued tasks of the actor. */
|
|
if (!cleanup) {
|
|
if (state->db != NULL) {
|
|
actor_table_mark_removed(state->db, worker->actor_id);
|
|
}
|
|
|
|
if (worker->task_in_progress != NULL) {
|
|
finish_killed_task(state,
|
|
*Task_task_execution_spec(worker->task_in_progress));
|
|
}
|
|
|
|
state->removed_actors.insert(worker->actor_id);
|
|
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(worker->actor_id) != 0);
|
|
LocalActorInfo &entry =
|
|
algorithm_state->local_actor_infos.find(worker->actor_id)->second;
|
|
for (auto &task : *entry.task_queue) {
|
|
finish_killed_task(state, task);
|
|
}
|
|
}
|
|
|
|
remove_actor(algorithm_state, worker->actor_id);
|
|
|
|
/* Attempt to dispatch some tasks because some resources may have freed up. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
|
|
/* Start a worker to replace the removed actor's worker and replenish the
|
|
* worker pool. */
|
|
start_worker(state);
|
|
}
|
|
|
|
/* NOTE(swang): For tasks that saved a checkpoint, we should consider
|
|
* overwriting the result table entries for the current task frontier to
|
|
* avoid duplicate task submissions during reconstruction. */
|
|
void handle_actor_worker_available(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
ActorID actor_id = worker->actor_id;
|
|
RAY_CHECK(!actor_id.is_nil());
|
|
/* Get the actor info for this worker. */
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) == 1);
|
|
LocalActorInfo &entry =
|
|
algorithm_state->local_actor_infos.find(actor_id)->second;
|
|
RAY_CHECK(worker == entry.worker);
|
|
RAY_CHECK(!entry.worker_available);
|
|
/* If an actor task was assigned, mark returned dummy object as locally
|
|
* available. This is not added to the object table, so the update will be
|
|
* invisible to other nodes. */
|
|
/* NOTE(swang): These objects are never cleaned up. We should consider
|
|
* removing the objects, e.g., when an actor is terminated. */
|
|
if (!entry.execution_dependency.is_nil()) {
|
|
handle_object_available(state, algorithm_state, entry.execution_dependency);
|
|
}
|
|
/* Unset the fields indicating an assigned task. */
|
|
entry.worker_available = true;
|
|
/* Assign new tasks if possible. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_worker_blocked(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
/* Find the worker in the list of executing workers. */
|
|
RAY_CHECK(
|
|
remove_worker_from_vector(algorithm_state->executing_workers, worker));
|
|
|
|
/* Check that the worker isn't in the list of blocked workers. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->blocked_workers, worker));
|
|
|
|
/* Add the worker to the list of blocked workers. */
|
|
algorithm_state->blocked_workers.push_back(worker);
|
|
|
|
/* Try to dispatch tasks, since we may have freed up some resources. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_actor_worker_blocked(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
/* The actor case doesn't use equivalents of the blocked_workers and
|
|
* executing_workers lists. Are these necessary? */
|
|
/* Try to dispatch tasks, since we may have freed up some resources. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
}
|
|
|
|
void handle_worker_unblocked(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {
|
|
/* Find the worker in the list of blocked workers. */
|
|
RAY_CHECK(
|
|
remove_worker_from_vector(algorithm_state->blocked_workers, worker));
|
|
|
|
/* Check that the worker isn't in the list of executing workers. */
|
|
RAY_CHECK(!worker_in_vector(algorithm_state->executing_workers, worker));
|
|
|
|
/* Add the worker to the list of executing workers. */
|
|
algorithm_state->executing_workers.push_back(worker);
|
|
}
|
|
|
|
void handle_actor_worker_unblocked(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
LocalSchedulerClient *worker) {}
|
|
|
|
void handle_object_available(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ObjectID object_id) {
|
|
auto object_entry_it = algorithm_state->remote_objects.find(object_id);
|
|
|
|
ObjectEntry entry;
|
|
/* Get the entry for this object from the active fetch request, or allocate
|
|
* one if needed. */
|
|
if (object_entry_it != algorithm_state->remote_objects.end()) {
|
|
/* Remove the object from the active fetch requests. */
|
|
entry = object_entry_it->second;
|
|
algorithm_state->remote_objects.erase(object_id);
|
|
}
|
|
|
|
/* Add the entry to the set of locally available objects. */
|
|
RAY_CHECK(algorithm_state->local_objects.count(object_id) == 0);
|
|
algorithm_state->local_objects[object_id] = entry;
|
|
|
|
if (!entry.dependent_tasks.empty()) {
|
|
/* Out of the tasks that were dependent on this object, if they are now
|
|
* ready to run, move them to the dispatch queue. */
|
|
for (auto &it : entry.dependent_tasks) {
|
|
if (can_run(algorithm_state, *it)) {
|
|
if (TaskSpec_is_actor_task(it->Spec())) {
|
|
insert_actor_task_queue(state, algorithm_state, std::move(*it));
|
|
} else {
|
|
algorithm_state->dispatch_task_queue->push_back(std::move(*it));
|
|
}
|
|
/* Remove the entry with a matching TaskSpec pointer from the waiting
|
|
* queue, but do not free the task spec. */
|
|
algorithm_state->waiting_task_queue->erase(it);
|
|
}
|
|
}
|
|
/* Try to dispatch tasks, since we may have added some from the waiting
|
|
* queue. */
|
|
dispatch_all_tasks(state, algorithm_state);
|
|
/* Clean up the records for dependent tasks. */
|
|
entry.dependent_tasks.clear();
|
|
}
|
|
}
|
|
|
|
void handle_object_removed(LocalSchedulerState *state,
|
|
ObjectID removed_object_id) {
|
|
/* Remove the object from the set of locally available objects. */
|
|
SchedulingAlgorithmState *algorithm_state = state->algorithm_state;
|
|
|
|
RAY_CHECK(algorithm_state->local_objects.count(removed_object_id) == 1);
|
|
algorithm_state->local_objects.erase(removed_object_id);
|
|
|
|
/* Track queued tasks that were dependent on this object.
|
|
* NOTE: Since objects often get removed in batches (e.g., during eviction),
|
|
* we may end up iterating through the queues many times in a row. If this
|
|
* turns out to be a bottleneck, consider tracking dependencies even for
|
|
* tasks in the dispatch queue, or batching object notifications. */
|
|
/* Track the dependency for tasks that were in the dispatch queue. Remove
|
|
* these tasks from the dispatch queue and push them to the waiting queue. */
|
|
for (auto it = algorithm_state->dispatch_task_queue->begin();
|
|
it != algorithm_state->dispatch_task_queue->end();) {
|
|
if (it->DependsOn(removed_object_id)) {
|
|
/* This task was dependent on the removed object. */
|
|
RAY_LOG(DEBUG) << "Moved task from dispatch queue back to waiting queue";
|
|
algorithm_state->waiting_task_queue->push_back(std::move(*it));
|
|
/* Remove the task from the dispatch queue, but do not free the task
|
|
* spec. */
|
|
it = algorithm_state->dispatch_task_queue->erase(it);
|
|
} else {
|
|
/* The task can still run, so continue to the next task. */
|
|
++it;
|
|
}
|
|
}
|
|
|
|
std::vector<ActorID> empty_actor_queues;
|
|
for (auto it = algorithm_state->actors_with_pending_tasks.begin();
|
|
it != algorithm_state->actors_with_pending_tasks.end(); it++) {
|
|
auto actor_info = algorithm_state->local_actor_infos[*it];
|
|
for (auto queue_it = actor_info.task_queue->begin();
|
|
queue_it != actor_info.task_queue->end();) {
|
|
if (queue_it->DependsOn(removed_object_id)) {
|
|
/* This task was dependent on the removed object. */
|
|
RAY_LOG(DEBUG) << "Moved task from actor dispatch queue back to "
|
|
<< "waiting queue";
|
|
algorithm_state->waiting_task_queue->push_back(std::move(*queue_it));
|
|
/* Remove the task from the dispatch queue, but do not free the task
|
|
* spec. */
|
|
queue_it = actor_info.task_queue->erase(queue_it);
|
|
if (actor_info.task_queue->size() == 0) {
|
|
empty_actor_queues.push_back(*it);
|
|
}
|
|
} else {
|
|
++queue_it;
|
|
}
|
|
}
|
|
}
|
|
for (auto actor_id : empty_actor_queues) {
|
|
algorithm_state->actors_with_pending_tasks.erase(actor_id);
|
|
}
|
|
|
|
/* Track the dependency for tasks that are in the waiting queue, including
|
|
* those that were just moved from the dispatch queue. */
|
|
for (auto it = algorithm_state->waiting_task_queue->begin();
|
|
it != algorithm_state->waiting_task_queue->end(); ++it) {
|
|
int64_t num_dependencies = it->NumDependencies();
|
|
for (int64_t i = 0; i < num_dependencies; ++i) {
|
|
int count = it->DependencyIdCount(i);
|
|
for (int j = 0; j < count; ++j) {
|
|
ObjectID dependency_id = it->DependencyId(i, j);
|
|
if (dependency_id == removed_object_id) {
|
|
/* Do not request a transfer from other plasma managers if this is an
|
|
* execution dependency. */
|
|
bool request_transfer = it->IsStaticDependency(i);
|
|
fetch_missing_dependency(state, algorithm_state, it,
|
|
removed_object_id.to_plasma_id(),
|
|
request_transfer);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
void handle_driver_removed(LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
WorkerID driver_id) {
|
|
/* Loop over fetch requests. This must be done before we clean up the waiting
|
|
* task queue and the dispatch task queue because this map contains iterators
|
|
* for those lists, which will be invalidated when we clean up those lists.*/
|
|
for (auto it = algorithm_state->remote_objects.begin();
|
|
it != algorithm_state->remote_objects.end();) {
|
|
/* Loop over the tasks that are waiting for this object and remove the tasks
|
|
* for the removed driver. */
|
|
auto task_it_it = it->second.dependent_tasks.begin();
|
|
while (task_it_it != it->second.dependent_tasks.end()) {
|
|
/* If the dependent task was a task for the removed driver, remove it from
|
|
* this vector. */
|
|
TaskSpec *spec = (*task_it_it)->Spec();
|
|
if (TaskSpec_driver_id(spec) == driver_id) {
|
|
task_it_it = it->second.dependent_tasks.erase(task_it_it);
|
|
} else {
|
|
task_it_it++;
|
|
}
|
|
}
|
|
/* If there are no more dependent tasks for this object, then remove the
|
|
* ObjectEntry. */
|
|
if (it->second.dependent_tasks.size() == 0) {
|
|
it = algorithm_state->remote_objects.erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
|
|
/* Remove this driver's tasks from the waiting task queue. */
|
|
auto it = algorithm_state->waiting_task_queue->begin();
|
|
while (it != algorithm_state->waiting_task_queue->end()) {
|
|
TaskSpec *spec = it->Spec();
|
|
if (TaskSpec_driver_id(spec) == driver_id) {
|
|
it = algorithm_state->waiting_task_queue->erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
|
|
/* Remove this driver's tasks from the dispatch task queue. */
|
|
it = algorithm_state->dispatch_task_queue->begin();
|
|
while (it != algorithm_state->dispatch_task_queue->end()) {
|
|
TaskSpec *spec = it->Spec();
|
|
if (TaskSpec_driver_id(spec) == driver_id) {
|
|
it = algorithm_state->dispatch_task_queue->erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
|
|
// Remove this driver's tasks from the cached actor tasks. Note that this loop
|
|
// could be very slow if the vector of cached actor tasks is very long.
|
|
for (auto it = algorithm_state->cached_submitted_actor_tasks.begin();
|
|
it != algorithm_state->cached_submitted_actor_tasks.end();) {
|
|
TaskSpec *spec = (*it).Spec();
|
|
if (TaskSpec_driver_id(spec) == driver_id) {
|
|
it = algorithm_state->cached_submitted_actor_tasks.erase(it);
|
|
} else {
|
|
++it;
|
|
}
|
|
}
|
|
|
|
/* TODO(rkn): Should we clean up the actor data structures? */
|
|
}
|
|
|
|
int num_waiting_tasks(SchedulingAlgorithmState *algorithm_state) {
|
|
return algorithm_state->waiting_task_queue->size();
|
|
}
|
|
|
|
int num_dispatch_tasks(SchedulingAlgorithmState *algorithm_state) {
|
|
return algorithm_state->dispatch_task_queue->size();
|
|
}
|
|
|
|
void print_worker_info(const char *message,
|
|
SchedulingAlgorithmState *algorithm_state) {
|
|
RAY_LOG(DEBUG) << message << ": " << algorithm_state->available_workers.size()
|
|
<< " available, " << algorithm_state->executing_workers.size()
|
|
<< " executing, " << algorithm_state->blocked_workers.size()
|
|
<< " blocked";
|
|
}
|
|
|
|
std::unordered_map<ActorHandleID, int64_t> get_actor_task_counters(
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id) {
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
|
|
return algorithm_state->local_actor_infos[actor_id].task_counters;
|
|
}
|
|
|
|
void set_actor_task_counters(
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id,
|
|
const std::unordered_map<ActorHandleID, int64_t> &task_counters) {
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
|
|
/* Overwrite the current task counters for the actor. This is necessary
|
|
* during reconstruction when resuming from a checkpoint so that we can
|
|
* resume the task frontier at the time that the checkpoint was saved. */
|
|
auto &entry = algorithm_state->local_actor_infos[actor_id];
|
|
entry.task_counters = task_counters;
|
|
|
|
/* Filter out tasks for the actor that were submitted earlier than the new
|
|
* task counter. These represent tasks that executed before the actor's
|
|
* resumed checkpoint, and therefore should not be re-executed. */
|
|
for (auto it = entry.task_queue->begin(); it != entry.task_queue->end();) {
|
|
/* Filter out duplicate tasks for the actor that are runnable. */
|
|
TaskSpec *pending_task_spec = it->Spec();
|
|
ActorHandleID handle_id = TaskSpec_actor_handle_id(pending_task_spec);
|
|
auto task_counter = entry.task_counters.find(handle_id);
|
|
if (task_counter != entry.task_counters.end() &&
|
|
TaskSpec_actor_counter(pending_task_spec) < task_counter->second) {
|
|
/* If the task's counter is less than the highest count for that handle,
|
|
* then remove it from the actor's runnable queue. */
|
|
it = entry.task_queue->erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
for (auto it = algorithm_state->waiting_task_queue->begin();
|
|
it != algorithm_state->waiting_task_queue->end();) {
|
|
/* Filter out duplicate tasks for the actor that are waiting on a missing
|
|
* dependency. */
|
|
TaskSpec *spec = it->Spec();
|
|
if (TaskSpec_actor_id(spec) == actor_id &&
|
|
TaskSpec_actor_counter(spec) <
|
|
entry.task_counters[TaskSpec_actor_handle_id(spec)]) {
|
|
/* If the waiting task is for the same actor and its task counter is less
|
|
* than the highest count for that handle, then clear its object
|
|
* dependencies and remove it from the queue. */
|
|
clear_missing_dependencies(algorithm_state, it);
|
|
it = algorithm_state->waiting_task_queue->erase(it);
|
|
} else {
|
|
it++;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::unordered_map<ActorHandleID, ObjectID> get_actor_frontier(
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id) {
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
|
|
return algorithm_state->local_actor_infos[actor_id].frontier_dependencies;
|
|
}
|
|
|
|
void set_actor_frontier(
|
|
LocalSchedulerState *state,
|
|
SchedulingAlgorithmState *algorithm_state,
|
|
ActorID actor_id,
|
|
const std::unordered_map<ActorHandleID, ObjectID> &frontier_dependencies) {
|
|
RAY_CHECK(algorithm_state->local_actor_infos.count(actor_id) != 0);
|
|
auto entry = algorithm_state->local_actor_infos[actor_id];
|
|
entry.frontier_dependencies = frontier_dependencies;
|
|
for (auto frontier_dependency : entry.frontier_dependencies) {
|
|
if (algorithm_state->local_objects.count(frontier_dependency.second) == 0) {
|
|
handle_object_available(state, algorithm_state,
|
|
frontier_dependency.second);
|
|
}
|
|
}
|
|
}
|