diff --git a/.travis.yml b/.travis.yml index 083f000de..50a77933f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -125,46 +125,46 @@ matrix: # module is only found if the test directory is in the PYTHONPATH. - export PYTHONPATH="$PYTHONPATH:./test/" - - python -m pytest python/ray/common/test/test.py - - python -m pytest python/ray/common/redis_module/runtest.py - - python -m pytest python/ray/plasma/test/test.py - # - python -m pytest python/ray/local_scheduler/test/test.py - # - python -m pytest python/ray/global_scheduler/test/test.py + - python -m pytest -v python/ray/common/test/test.py + - python -m pytest -v python/ray/common/redis_module/runtest.py + - python -m pytest -v python/ray/plasma/test/test.py + # - python -m pytest -v python/ray/local_scheduler/test/test.py + # - python -m pytest -v python/ray/global_scheduler/test/test.py - - python -m pytest python/ray/test/test_queue.py - - python -m pytest test/xray_test.py + - python -m pytest -v python/ray/test/test_queue.py + - python -m pytest -v test/xray_test.py # The --assert=plain here is because pytest's assertion # rewriting mechanism seems to mess up on this file, # see https://github.com/ray-project/ray/issues/2514 - python -m pytest -v --assert=plain test/runtest.py - - python -m pytest test/array_test.py - - python -m pytest test/actor_test.py - - python -m pytest test/autoscaler_test.py - - python -m pytest test/tensorflow_test.py - - python -m pytest test/failure_test.py - - python -m pytest test/microbenchmarks.py - - python -m pytest test/stress_tests.py + - python -m pytest -v test/array_test.py + - python -m pytest -v test/actor_test.py + - python -m pytest -v test/autoscaler_test.py + - python -m pytest -v test/tensorflow_test.py + - python -m pytest -v test/failure_test.py + - python -m pytest -v test/microbenchmarks.py + - python -m pytest -v test/stress_tests.py - pytest test/component_failures_test.py - python test/multi_node_test.py - - python -m pytest test/recursion_test.py + - python -m pytest -v test/recursion_test.py - pytest test/monitor_test.py - - python -m pytest test/cython_test.py - - python -m pytest test/credis_test.py + - python -m pytest -v test/cython_test.py + - python -m pytest -v test/credis_test.py # ray tune tests - python python/ray/tune/test/dependency_test.py - - python -m pytest python/ray/tune/test/trial_runner_test.py - - python -m pytest python/ray/tune/test/trial_scheduler_test.py - - python -m pytest python/ray/tune/test/experiment_test.py - - python -m pytest python/ray/tune/test/tune_server_test.py - - python -m pytest python/ray/tune/test/ray_trial_executor_test.py + - python -m pytest -v python/ray/tune/test/trial_runner_test.py + - python -m pytest -v python/ray/tune/test/trial_scheduler_test.py + - python -m pytest -v python/ray/tune/test/experiment_test.py + - python -m pytest -v python/ray/tune/test/tune_server_test.py + - python -m pytest -v python/ray/tune/test/ray_trial_executor_test.py # ray rllib tests - - python -m pytest python/ray/rllib/test/test_catalog.py - - python -m pytest python/ray/rllib/test/test_filters.py - - python -m pytest python/ray/rllib/test/test_optimizers.py - - python -m pytest python/ray/rllib/test/test_evaluators.py + - python -m pytest -v python/ray/rllib/test/test_catalog.py + - python -m pytest -v python/ray/rllib/test/test_filters.py + - python -m pytest -v python/ray/rllib/test/test_optimizers.py + - python -m pytest -v python/ray/rllib/test/test_evaluators.py install: @@ -197,46 +197,46 @@ script: # module is only found if the test directory is in the PYTHONPATH. - export PYTHONPATH="$PYTHONPATH:./test/" - - python -m pytest python/ray/common/test/test.py - - python -m pytest python/ray/common/redis_module/runtest.py - - python -m pytest python/ray/plasma/test/test.py - - python -m pytest python/ray/local_scheduler/test/test.py - - python -m pytest python/ray/global_scheduler/test/test.py + - python -m pytest -v python/ray/common/test/test.py + - python -m pytest -v python/ray/common/redis_module/runtest.py + - python -m pytest -v python/ray/plasma/test/test.py + - python -m pytest -v python/ray/local_scheduler/test/test.py + - python -m pytest -v python/ray/global_scheduler/test/test.py - - python -m pytest python/ray/test/test_queue.py - - python -m pytest test/xray_test.py + - python -m pytest -v python/ray/test/test_queue.py + - python -m pytest -v test/xray_test.py # The --assert=plain here is because pytest's assertion # rewriting mechanism seems to mess up on this file, # see https://github.com/ray-project/ray/issues/2514 - python -m pytest --assert=plain -v test/runtest.py - - python -m pytest test/array_test.py - - python -m pytest test/actor_test.py - - python -m pytest test/autoscaler_test.py - - python -m pytest test/tensorflow_test.py - - python -m pytest test/failure_test.py - - python -m pytest test/microbenchmarks.py - - python -m pytest test/stress_tests.py - - python -m pytest test/component_failures_test.py + - python -m pytest -v test/array_test.py + - python -m pytest -v test/actor_test.py + - python -m pytest -v test/autoscaler_test.py + - python -m pytest -v test/tensorflow_test.py + - python -m pytest -v test/failure_test.py + - python -m pytest -v test/microbenchmarks.py + - python -m pytest -v test/stress_tests.py + - python -m pytest -v test/component_failures_test.py - python test/multi_node_test.py - - python -m pytest test/recursion_test.py - - python -m pytest test/monitor_test.py - - python -m pytest test/cython_test.py - - python -m pytest test/credis_test.py + - python -m pytest -v test/recursion_test.py + - python -m pytest -v test/monitor_test.py + - python -m pytest -v test/cython_test.py + - python -m pytest -v test/credis_test.py # ray tune tests - python python/ray/tune/test/dependency_test.py - - python -m pytest python/ray/tune/test/trial_runner_test.py - - python -m pytest python/ray/tune/test/trial_scheduler_test.py - - python -m pytest python/ray/tune/test/experiment_test.py - - python -m pytest python/ray/tune/test/tune_server_test.py - - python -m pytest python/ray/tune/test/ray_trial_executor_test.py + - python -m pytest -v python/ray/tune/test/trial_runner_test.py + - python -m pytest -v python/ray/tune/test/trial_scheduler_test.py + - python -m pytest -v python/ray/tune/test/experiment_test.py + - python -m pytest -v python/ray/tune/test/tune_server_test.py + - python -m pytest -v python/ray/tune/test/ray_trial_executor_test.py # ray rllib tests - - python -m pytest python/ray/rllib/test/test_catalog.py - - python -m pytest python/ray/rllib/test/test_filters.py - - python -m pytest python/ray/rllib/test/test_optimizers.py - - python -m pytest python/ray/rllib/test/test_evaluators.py + - python -m pytest -v python/ray/rllib/test/test_catalog.py + - python -m pytest -v python/ray/rllib/test/test_filters.py + - python -m pytest -v python/ray/rllib/test/test_optimizers.py + - python -m pytest -v python/ray/rllib/test/test_evaluators.py deploy: - provider: s3 diff --git a/python/ray/monitor.py b/python/ray/monitor.py index ac06a3042..bcaab4d2a 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -337,7 +337,7 @@ class Monitor(object): static_resources[static] = message.ResourcesTotalCapacity(i) # Update the load metrics for this local scheduler. - client_id = message.ClientId().decode("utf-8") + client_id = ray.utils.binary_to_hex(message.ClientId()) ip = self.local_scheduler_id_to_ip_map.get(client_id) if ip: self.load_metrics.update(ip, static_resources, dynamic_resources) diff --git a/src/ray/gcs/format/gcs.fbs b/src/ray/gcs/format/gcs.fbs index 0bdd38d63..ee667aaea 100644 --- a/src/ray/gcs/format/gcs.fbs +++ b/src/ray/gcs/format/gcs.fbs @@ -194,6 +194,9 @@ table HeartbeatTableData { // Total resource capacity configured for this node manager. resources_total_label: [string]; resources_total_capacity: [double]; + // Aggregate outstanding resource load on this node manager. + resource_load_label: [string]; + resource_load_capacity: [double]; } // Data for a lease on task execution. diff --git a/src/ray/raylet/design_docs/task_states.rst b/src/ray/raylet/design_docs/task_states.rst index c17b8f36a..7613e77c3 100644 --- a/src/ray/raylet/design_docs/task_states.rst +++ b/src/ray/raylet/design_docs/task_states.rst @@ -3,16 +3,16 @@ Task State: Definitions & Transition Diagram A task can be in one of the following states: -- **Placeable**: the task is ready to be placed at the node where is going to be - executed. This can be either local or a remote node. The decision is based on - resource availability (the location and size of the task's arguments are - ignore). If the local node has enough resources to satisfy task's demand, then - the task is placed locally, otherwise is forwarded to another node. +- **Placeable**: the task is ready to be assigned to a node (either a local or a + remote node). The decision is based on resource availability (the location and + size of the task's arguments are currently ignored). If the local node has + enough resources to satisfy task's demand, then the task is placed locally, + otherwise it is forwarded to another node. This placement decision is not + final. The task can later be spilled over to another node. - **WaitForActorCreation**: an actor method (task) is waiting for its actor to get - instantiated. Once the actor is created, the task transitions into the - waiting state, if the actor is local, or it is forwarded to the remote machine - running the actor. + instantiated. Once the actor is created, the task will be forwarded to the + remote machine running the actor. - **Waiting**: the task is waiting for its argument dependencies to be satisfied, i.e., for its arguments to be transferred to the local object store. @@ -24,18 +24,30 @@ A task can be in one of the following states: worker/actor. - **Blocked**: the task is being blocked as some data objects it depends on are not - available, e.g., because the task has launched another task and it waits - for the results, ore because of failures. + available, e.g., because the task has launched another task and is waiting + for the results. + +- **Infeasible:** the task has resource requirements that are not satisfied by + any machine. :: - forward - ------ - | | resource arguments actor/worker - | v available local available - Placeable ----------> Waiting --------> Ready ---------> Running - | ^ ^ | ^ - actor | | actor | actor worker | | worker - created | | created | created blocked | | unblocked - v | (remote) | (local) v | - WaitForActorCreation--------- Blocked + --------------------------------- + | | + | forward | forward + |---------------- | + node with ------| | arguments | + resources forward| | resource | local | actor/worker + joins | v available | --------> | available + ---------------------- Placeable ----------> Waiting Ready ---------> Running + | | | ^ ^ <-------- ^ | ^ + | |--------- | | | local arg | | | + | | | | | evicted | worker | | worker + | | actor | | | | blocked | | unblocked + | resources | created | | actor | --------------- | | + | infeasible | | | created | actor | | + | | | | (remote) | created v | + | | v | | (local) Blocked + | | WaitForActorCreation---------- + | v + ----Infeasible diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e19b6c111..bead7bd82 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -235,11 +235,13 @@ void NodeManager::Heartbeat() { RAY_LOG(DEBUG) << "[Heartbeat] sending heartbeat."; auto &heartbeat_table = gcs_client_->heartbeat_table(); auto heartbeat_data = std::make_shared(); - auto client_id = gcs_client_->client_table().GetLocalClientId(); - const SchedulingResources &local_resources = cluster_resource_map_[client_id]; - heartbeat_data->client_id = client_id.hex(); + const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); + SchedulingResources &local_resources = cluster_resource_map_[my_client_id]; + heartbeat_data->client_id = my_client_id.binary(); // TODO(atumanov): modify the heartbeat table protocol to use the ResourceSet directly. // TODO(atumanov): implement a ResourceSet const_iterator. + RAY_LOG(DEBUG) << "[Heartbeat] resources available: " + << local_resources.GetAvailableResources().ToString(); for (const auto &resource_pair : local_resources.GetAvailableResources().GetResourceMap()) { heartbeat_data->resources_available_label.push_back(resource_pair.first); @@ -250,6 +252,12 @@ void NodeManager::Heartbeat() { heartbeat_data->resources_total_capacity.push_back(resource_pair.second); } + local_resources.SetLoadResources(local_queues_.GetResourceLoad()); + for (const auto &resource_pair : local_resources.GetLoadResources().GetResourceMap()) { + heartbeat_data->resource_load_label.push_back(resource_pair.first); + heartbeat_data->resource_load_capacity.push_back(resource_pair.second); + } + ray::Status status = heartbeat_table.Add( UniqueID::nil(), gcs_client_->client_table().GetLocalClientId(), heartbeat_data, [](ray::gcs::AsyncGcsClient *client, const ClientID &id, @@ -338,7 +346,8 @@ void NodeManager::ClientRemoved(const ClientTableDataT &client_data) { void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &client_id, const HeartbeatTableDataT &heartbeat_data) { RAY_LOG(DEBUG) << "[HeartbeatAdded]: received heartbeat from client id " << client_id; - if (client_id == gcs_client_->client_table().GetLocalClientId()) { + const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); + if (client_id == local_client_id) { // Skip heartbeats from self. return; } @@ -351,14 +360,31 @@ void NodeManager::HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &cl << client_id; return; } - SchedulingResources &resources = it->second; - ResourceSet heartbeat_resource_available(heartbeat_data.resources_available_label, - heartbeat_data.resources_available_capacity); - resources.SetAvailableResources( - ResourceSet(heartbeat_data.resources_available_label, - heartbeat_data.resources_available_capacity)); - RAY_CHECK(this->cluster_resource_map_[client_id].GetAvailableResources() == - heartbeat_resource_available); + SchedulingResources &remote_resources = it->second; + + ResourceSet remote_available(heartbeat_data.resources_available_label, + heartbeat_data.resources_available_capacity); + ResourceSet remote_load(heartbeat_data.resource_load_label, + heartbeat_data.resource_load_capacity); + // TODO(atumanov): assert that the load is a non-empty ResourceSet. + RAY_LOG(DEBUG) << "[HeartbeatAdded]: received load: " << remote_load.ToString(); + remote_resources.SetAvailableResources(std::move(remote_available)); + // Extract the load information and save it locally. + remote_resources.SetLoadResources(std::move(remote_load)); + + auto decision = scheduling_policy_.SpillOver(remote_resources); + // Extract decision for this local scheduler. + std::unordered_set local_task_ids; + for (const auto &task_id : decision) { + // (See design_docs/task_states.rst for the state transition diagram.) + const auto task = local_queues_.RemoveTask(task_id); + // Since we are spilling back from the ready and waiting queues, we need + // to unsubscribe the dependencies. + task_dependency_manager_.UnsubscribeDependencies(task_id); + // Attempt to forward the task. If this fails to forward the task, + // the task will be resubmit locally. + ForwardTaskOrResubmit(task, client_id); + } } void NodeManager::HandleActorCreation(const ActorID &actor_id, @@ -455,13 +481,13 @@ void NodeManager::ProcessNewClient(LocalClientConnection &client) { void NodeManager::DispatchTasks() { // Work with a copy of scheduled tasks. // (See design_docs/task_states.rst for the state transition diagram.) - auto scheduled_tasks = local_queues_.GetReadyTasks(); + auto ready_tasks = local_queues_.GetReadyTasks(); // Return if there are no tasks to schedule. - if (scheduled_tasks.empty()) { + if (ready_tasks.empty()) { return; } - for (const auto &task : scheduled_tasks) { + for (const auto &task : ready_tasks) { const auto &task_resources = task.GetTaskSpecification().GetRequiredResources(); if (!local_available_resources_.Contains(task_resources)) { // Not enough local resources for this task right now, skip this task. @@ -490,6 +516,7 @@ void NodeManager::ProcessClientMessage( if (message->is_worker()) { // Register the new worker. worker_pool_.RegisterWorker(std::move(worker)); + DispatchTasks(); } else { // Register the new driver. JobID job_id = from_flatbuf(*message->driver_id()); @@ -507,6 +534,10 @@ void NodeManager::ProcessClientMessage( } // Return the worker to the idle pool. worker_pool_.PushWorker(std::move(worker)); + // Local resource availability changed: invoke scheduling policy for local node. + const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); + cluster_resource_map_[local_client_id].SetLoadResources( + local_queues_.GetResourceLoad()); // Call task dispatch to assign work to the new worker. DispatchTasks(); @@ -758,15 +789,22 @@ void NodeManager::ProcessNodeManagerMessage(TcpClientConnection &node_manager_cl node_manager_client.ProcessMessages(); } -void NodeManager::ScheduleTasks() { - auto policy_decision = scheduling_policy_.Schedule( - cluster_resource_map_, gcs_client_->client_table().GetLocalClientId(), - remote_clients_); +void NodeManager::ScheduleTasks( + std::unordered_map &resource_map) { + const ClientID &local_client_id = gcs_client_->client_table().GetLocalClientId(); + + // If the resource map contains the local raylet, update load before calling policy. + if (resource_map.count(local_client_id) > 0) { + resource_map[local_client_id].SetLoadResources(local_queues_.GetResourceLoad()); + } + // Invoke the scheduling policy. + auto policy_decision = scheduling_policy_.Schedule(resource_map, local_client_id); + #ifndef NDEBUG RAY_LOG(DEBUG) << "[NM ScheduleTasks] policy decision:"; - for (const auto &pair : policy_decision) { - TaskID task_id = pair.first; - ClientID client_id = pair.second; + for (const auto &task_client_pair : policy_decision) { + TaskID task_id = task_client_pair.first; + ClientID client_id = task_client_pair.second; RAY_LOG(DEBUG) << task_id << " --> " << client_id; } #endif @@ -774,10 +812,10 @@ void NodeManager::ScheduleTasks() { // Extract decision for this local scheduler. std::unordered_set local_task_ids; // Iterate over (taskid, clientid) pairs, extract tasks assigned to the local node. - for (const auto &task_schedule : policy_decision) { - const TaskID task_id = task_schedule.first; - const ClientID client_id = task_schedule.second; - if (client_id == gcs_client_->client_table().GetLocalClientId()) { + for (const auto &task_client_pair : policy_decision) { + const TaskID &task_id = task_client_pair.first; + const ClientID &client_id = task_client_pair.second; + if (client_id == local_client_id) { local_task_ids.insert(task_id); } else { // TODO(atumanov): need a better interface for task exit on forward. @@ -801,9 +839,34 @@ void NodeManager::ScheduleTasks() { // manager. TaskDependencyManager::TaskPending() is assumed to be idempotent. // TODO(atumanov): evaluate performance implications of registering all new tasks on // submission vs. registering remaining queued placeable tasks here. + std::unordered_set move_task_set; for (const auto &task : local_queues_.GetPlaceableTasks()) { task_dependency_manager_.TaskPending(task); + move_task_set.insert(task.GetTaskSpecification().TaskId()); + // Assert that this placeable task is not feasible locally (necessary but not + // sufficient). + RAY_CHECK(!task.GetTaskSpecification().GetRequiredResources().IsSubset( + cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()] + .GetTotalResources())); } + + // Assumption: all remaining placeable tasks are infeasible and are moved to the + // infeasible task queue. Infeasible task queue is checked when new nodes join. + local_queues_.MoveTasks(move_task_set, TaskState::PLACEABLE, TaskState::INFEASIBLE); + // Check the invariant that no placeable tasks remain after a call to the policy. + RAY_CHECK(local_queues_.GetPlaceableTasks().size() == 0); +} + +bool NodeManager::CheckDependencyManagerInvariant() const { + std::vector pending_task_ids = task_dependency_manager_.GetPendingTasks(); + // Assert that each pending task in the task dependency manager is in one of the queues. + for (const auto &task_id : pending_task_ids) { + if (!local_queues_.HasTask(task_id)) { + return false; + } + } + // TODO(atumanov): perform the check in the opposite direction. + return true; } void NodeManager::TreatTaskAsFailed(const TaskSpecification &spec) { @@ -917,7 +980,10 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag } else { // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueuePlaceableTasks({task}); - ScheduleTasks(); + ScheduleTasks(cluster_resource_map_); + DispatchTasks(); + // TODO(atumanov): assert that !placeable.isempty() => insufficient available + // resources locally. } } } @@ -949,8 +1015,6 @@ void NodeManager::HandleWorkerBlocked(std::shared_ptr worker) { local_queues_.QueueBlockedTasks({task}); worker->MarkBlocked(); - // Try to dispatch more tasks since the blocked worker released some - // resources. DispatchTasks(); } @@ -998,10 +1062,6 @@ void NodeManager::HandleWorkerUnblocked(std::shared_ptr worker) { } void NodeManager::EnqueuePlaceableTask(const Task &task) { - // Mark the task as pending. Once the task has finished execution, or once it - // has been forwarded to another node, the task must be marked as canceled in - // the TaskDependencyManager. - task_dependency_manager_.TaskPending(task); // TODO(atumanov): add task lookup hashmap and change EnqueuePlaceableTask to take // a vector of TaskIDs. Trigger MoveTask internally. // Subscribe to the task's dependencies. @@ -1017,6 +1077,10 @@ void NodeManager::EnqueuePlaceableTask(const Task &task) { } else { local_queues_.QueueWaitingTasks({task}); } + // Mark the task as pending. Once the task has finished execution, or once it + // has been forwarded to another node, the task must be marked as canceled in + // the TaskDependencyManager. + task_dependency_manager_.TaskPending(task); } void NodeManager::AssignTask(Task &task) { @@ -1049,11 +1113,10 @@ void NodeManager::AssignTask(Task &task) { RAY_LOG(DEBUG) << "Assigning task to worker with pid " << worker->Pid(); flatbuffers::FlatBufferBuilder fbb; - const ClientID &my_client_id = gcs_client_->client_table().GetLocalClientId(); - // Resource accounting: acquire resources for the assigned task. auto acquired_resources = local_available_resources_.Acquire(spec.GetRequiredResources()); + const auto &my_client_id = gcs_client_->client_table().GetLocalClientId(); RAY_CHECK( this->cluster_resource_map_[my_client_id].Acquire(spec.GetRequiredResources())); @@ -1119,6 +1182,7 @@ void NodeManager::AssignTask(Task &task) { // worker once one becomes available. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.QueueReadyTasks(std::vector({task})); + DispatchTasks(); } } @@ -1243,7 +1307,7 @@ void NodeManager::HandleObjectLocal(const ObjectID &object_id) { // Transition tasks from waiting to scheduled. // (See design_docs/task_states.rst for the state transition diagram.) local_queues_.MoveTasks(ready_task_id_set, TaskState::WAITING, TaskState::READY); - // New scheduled tasks appeared in the queue, try to dispatch them. + // New ready tasks appeared in the queue, try to dispatch them. DispatchTasks(); // Check that remaining tasks that could not be transitioned are blocked @@ -1271,6 +1335,9 @@ void NodeManager::HandleObjectMissing(const ObjectID &object_id) { local_queues_.FilterState(waiting_task_id_set, TaskState::RUNNING); local_queues_.FilterState(waiting_task_id_set, TaskState::DRIVER); RAY_CHECK(waiting_task_id_set.empty()); + // Moving ready tasks to waiting may have changed the load, making space for placing + // new tasks locally. + ScheduleTasks(cluster_resource_map_); } } @@ -1317,7 +1384,8 @@ void NodeManager::ForwardTaskOrResubmit(const Task &task, // The task is not for an actor and may therefore be placed on another // node immediately. Send it to the scheduling policy to be placed again. local_queues_.QueuePlaceableTasks({task}); - ScheduleTasks(); + ScheduleTasks(cluster_resource_map_); + DispatchTasks(); } } } diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 809ab02c2..82da8ab3b 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -45,6 +45,9 @@ class NodeManager { std::shared_ptr gcs_client); /// Process a new client connection. + /// + /// \param client The client to process. + /// \return Void. void ProcessNewClient(LocalClientConnection &client); /// Process a message from a client. This method is responsible for @@ -54,30 +57,60 @@ class NodeManager { /// \param client The client that sent the message. /// \param message_type The message type (e.g., a flatbuffer enum). /// \param message A pointer to the message data. + /// \return Void. void ProcessClientMessage(const std::shared_ptr &client, int64_t message_type, const uint8_t *message); + /// Handle a new node manager connection. + /// + /// \param node_manager_client The connection to the remote node manager. + /// \return Void. void ProcessNewNodeManager(TcpClientConnection &node_manager_client); + /// Handle a message from a remote node manager. + /// + /// \param node_manager_client The connection to the remote node manager. + /// \param message_type The type of the message. + /// \param message The message contents. + /// \return Void. void ProcessNodeManagerMessage(TcpClientConnection &node_manager_client, int64_t message_type, const uint8_t *message); + /// Subscribe to the relevant GCS tables and set up handlers. + /// + /// \return Status indicating whether this was done successfully or not. ray::Status RegisterGcs(); private: /// Methods for handling clients. + /// Handler for the addition of a new GCS client. + /// + /// \param data Data associated with the new client. + /// \return Void. void ClientAdded(const ClientTableDataT &data); /// Handler for the removal of a GCS client. + /// \param client_data Data associated with the removed client. + /// \return Void. void ClientRemoved(const ClientTableDataT &client_data); /// Send heartbeats to the GCS. void Heartbeat(); /// Handler for a heartbeat notification from the GCS. + /// + /// \param client The GCS client. + /// \param id The ID of the node manager that sent the heartbeat. + /// \param data The heartbeat data including load information. + /// \return Void. void HeartbeatAdded(gcs::AsyncGcsClient *client, const ClientID &id, const HeartbeatTableDataT &data); /// Methods for task scheduling. - /// Enqueue a placeable task to wait on object dependencies or be ready for dispatch. + + /// Enqueue a placeable task to wait on object dependencies or be ready for + /// dispatch. + /// + /// \param task The task in question. + /// \return Void. void EnqueuePlaceableTask(const Task &task); /// This will treat the task as if it had been executed and failed. This is /// done by looping over the task return IDs and for each ID storing an object @@ -88,38 +121,80 @@ class NodeManager { /// \return Void. void TreatTaskAsFailed(const TaskSpecification &spec); /// Handle specified task's submission to the local node manager. + /// + /// \param task The task being submitted. + /// \param uncommitted_lineage The uncommitted lineage of the task. + /// \param forwarded True if the task has been forwarded from a different + /// node manager and false if it was submitted by a local worker. + /// \return Void. void SubmitTask(const Task &task, const Lineage &uncommitted_lineage, bool forwarded = false); /// Assign a task. The task is assumed to not be queued in local_queues_. + /// + /// \param task The task in question. + /// \return Void. void AssignTask(Task &task); /// Handle a worker finishing its assigned task. + /// + /// \param The worker that fiished the task. + /// \return Void. void FinishAssignedTask(Worker &worker); - /// Perform a placement decision on placeable tasks. - void ScheduleTasks(); + /// Make a placement decision for placeable tasks given the resource_map + /// provided. This will perform task state transitions and task forwarding. + /// + /// \param resource_map A mapping from node manager ID to an estimate of the + /// resources available to that node manager. Scheduling decisions will only + /// consider the local node manager and the node managers in the keys of the + /// resource_map argument. + /// \return Void. + void ScheduleTasks(std::unordered_map &resource_map); /// Handle a task whose return value(s) must be reconstructed. + /// + /// \param task_id The relevant task ID. + /// \return Void. void HandleTaskReconstruction(const TaskID &task_id); /// Resubmit a task for execution. This is a task that was previously already /// submitted to a raylet but which must now be re-executed. + /// + /// \param task The task being resubmitted. + /// \return Void. void ResubmitTask(const Task &task); /// Attempt to forward a task to a remote different node manager. If this /// fails, the task will be resubmit locally. /// /// \param task The task in question. /// \param node_manager_id The ID of the remote node manager. + /// \return Void. void ForwardTaskOrResubmit(const Task &task, const ClientID &node_manager_id); /// Forward a task to another node to execute. The task is assumed to not be /// queued in local_queues_. + /// + /// \param task The task to forward. + /// \param node_id The ID of the node to forward the task to. + /// \return A status indicating whether the forward succeeded or not. Note + /// that a status of OK is not a reliable indicator that the forward succeeded + /// or even that the remote node is still alive. ray::Status ForwardTask(const Task &task, const ClientID &node_id); /// Dispatch locally scheduled tasks. This attempts the transition from "scheduled" to /// "running" task state. void DispatchTasks(); /// Handle a worker becoming blocked in a `ray.get`. + /// + /// \param worker The worker that is blocked. + /// \return Void. void HandleWorkerBlocked(std::shared_ptr worker); /// Handle a worker exiting a `ray.get`. + /// + /// \param worker The worker that is unblocked. + /// \return Void. void HandleWorkerUnblocked(std::shared_ptr worker); /// Methods for actor scheduling. /// Handler for the creation of an actor, possibly on a remote node. + /// + /// \param actor_id The actor ID of the actor that was created. + /// \param data Data associated with the actor creation event. + /// \return Void. void HandleActorCreation(const ActorID &actor_id, const std::vector &data); @@ -134,6 +209,7 @@ class NodeManager { /// \param tasks A list of tasks to extract from. /// \param tasks_to_remove The task IDs of the extracted tasks are inserted in /// this vector. + /// \return Void. void GetActorTasksFromList(const ActorID &actor_id, const std::list &tasks, std::unordered_set &tasks_to_remove); @@ -146,15 +222,32 @@ class NodeManager { /// Handle an object becoming local. This updates any local accounting, but /// does not write to any global accounting in the GCS. + /// + /// \param object_id The object that is locally available. + /// \return Void. void HandleObjectLocal(const ObjectID &object_id); /// Handle an object that is no longer local. This updates any local /// accounting, but does not write to any global accounting in the GCS. + /// + /// \param object_id The object that has been evicted locally. + /// \return Void. void HandleObjectMissing(const ObjectID &object_id); /// Handles updates to driver table. + /// + /// \param id An unused value. TODO(rkn): Should this be removed? + /// \param driver_data Data associated with a driver table event. + /// \return Void. void HandleDriverTableUpdate(const ClientID &id, const std::vector &driver_data); + /// Check if certain invariants associated with the task dependency manager + /// and the local queues are satisfied. This is only used for debugging + /// purposes. + /// + /// \return True if the invariants are satisfied and false otherwise. + bool CheckDependencyManagerInvariant() const; + boost::asio::io_service &io_service_; ObjectManager &object_manager_; /// A Plasma object store client. This is used exclusively for creating new diff --git a/src/ray/raylet/scheduling_policy.cc b/src/ray/raylet/scheduling_policy.cc index 0b0dbf855..0ed4efb27 100644 --- a/src/ray/raylet/scheduling_policy.cc +++ b/src/ray/raylet/scheduling_policy.cc @@ -13,12 +13,14 @@ SchedulingPolicy::SchedulingPolicy(const SchedulingQueue &scheduling_queue) gen_(std::chrono::high_resolution_clock::now().time_since_epoch().count()) {} std::unordered_map SchedulingPolicy::Schedule( - const std::unordered_map &cluster_resources, - const ClientID &local_client_id, const std::vector &others) { + std::unordered_map &cluster_resources, + const ClientID &local_client_id) { // The policy decision to be returned. std::unordered_map decision; // TODO(atumanov): protect DEBUG code blocks with ifdef DEBUG RAY_LOG(DEBUG) << "[Schedule] cluster resource map: "; + +#ifndef NDEBUG for (const auto &client_resource_pair : cluster_resources) { // pair = ClientID, SchedulingResources const ClientID &client_id = client_resource_pair.first; @@ -26,7 +28,10 @@ std::unordered_map SchedulingPolicy::Schedule( RAY_LOG(DEBUG) << "client_id: " << client_id << " " << resources.GetAvailableResources().ToString(); } +#endif + // We expect all placeable tasks to be placed on exit from this policy method. + RAY_CHECK(scheduling_queue_.GetPlaceableTasks().size() <= 1); // Iterate over running tasks, get their resource demand and try to schedule. for (const auto &t : scheduling_queue_.GetPlaceableTasks()) { // Get task's resource demand @@ -36,12 +41,8 @@ std::unordered_map SchedulingPolicy::Schedule( << " numforwards=" << t.GetTaskExecutionSpec().NumForwards() << " resources=" << t.GetTaskSpecification().GetRequiredResources().ToString(); - // TODO(atumanov): replace the simple spillback policy with exponential backoff based - // policy. - if (t.GetTaskExecutionSpec().NumForwards() >= 1) { - decision[task_id] = local_client_id; - continue; - } + + // TODO(atumanov): try to place tasks locally first. // Construct a set of viable node candidates and randomly pick between them. // Get all the client id keys and randomly pick. std::vector client_keys; @@ -49,9 +50,15 @@ std::unordered_map SchedulingPolicy::Schedule( // pair = ClientID, SchedulingResources ClientID node_client_id = client_resource_pair.first; const auto &node_resources = client_resource_pair.second; - RAY_LOG(DEBUG) << "client_id " << node_client_id << " resources: " - << node_resources.GetAvailableResources().ToString(); - if (resource_demand.IsSubset(node_resources.GetTotalResources())) { + ResourceSet available_node_resources = + ResourceSet(node_resources.GetAvailableResources()); + available_node_resources.SubtractResourcesStrict(node_resources.GetLoadResources()); + RAY_LOG(DEBUG) << "client_id " << node_client_id + << " avail: " << node_resources.GetAvailableResources().ToString() + << " load: " << node_resources.GetLoadResources().ToString() + << " avail-load: " << available_node_resources.ToString(); + + if (resource_demand.IsSubset(available_node_resources)) { // This node is a feasible candidate. client_keys.push_back(node_client_id); } @@ -63,17 +70,81 @@ std::unordered_map SchedulingPolicy::Schedule( // TODO(atumanov): change uniform random to discrete, weighted by resource capacity. std::uniform_int_distribution distribution(0, client_keys.size() - 1); int client_key_index = distribution(gen_); - decision[task_id] = client_keys[client_key_index]; - RAY_LOG(DEBUG) << "[SchedulingPolicy] idx=" << client_key_index << " " << task_id - << " --> " << client_keys[client_key_index]; + const ClientID &dst_client_id = client_keys[client_key_index]; + decision[task_id] = dst_client_id; + // Update dst_client_id's load to keep track of remote task load until + // the next heartbeat. + ResourceSet new_load(cluster_resources[dst_client_id].GetLoadResources()); + new_load.AddResources(resource_demand); + cluster_resources[dst_client_id].SetLoadResources(std::move(new_load)); } else { - // There are no nodes that can feasibily execute this task. TODO(rkn): Propagate a - // warning to the user. - RAY_LOG(WARNING) << "This task requires " - << t.GetTaskSpecification().GetRequiredResources().ToString() - << ", but no nodes have the necessary resources."; + // If the task doesn't fit, place randomly subject to hard constraints. + for (const auto &client_resource_pair2 : cluster_resources) { + // pair = ClientID, SchedulingResources + ClientID node_client_id = client_resource_pair2.first; + const auto &node_resources = client_resource_pair2.second; + if (resource_demand.IsSubset(node_resources.GetTotalResources())) { + // This node is a feasible candidate. + client_keys.push_back(node_client_id); + } + } + // client candidate list constructed, pick randomly. + if (!client_keys.empty()) { + // Choose index at random. + // Initialize a uniform integer distribution over the key space. + // TODO(atumanov): change uniform random to discrete, weighted by resource + // capacity. + std::uniform_int_distribution distribution(0, client_keys.size() - 1); + int client_key_index = distribution(gen_); + const ClientID &dst_client_id = client_keys[client_key_index]; + decision[t.GetTaskSpecification().TaskId()] = dst_client_id; + // Update dst_client_id's load to keep track of remote task load until + // the next heartbeat. + ResourceSet new_load(cluster_resources[dst_client_id].GetLoadResources()); + new_load.AddResources(resource_demand); + cluster_resources[dst_client_id].SetLoadResources(std::move(new_load)); + } else { + // There are no nodes that can feasibly execute this task. The task remains + // placeable until cluster capacity becomes available. + // TODO(rkn): Propagate a warning to the user. + RAY_LOG(INFO) << "This task requires " + << t.GetTaskSpecification().GetRequiredResources().ToString() + << ", but no nodes have the necessary resources."; + } } } + + return decision; +} + +std::vector SchedulingPolicy::SpillOver( + SchedulingResources &remote_scheduling_resources) const { + // The policy decision to be returned. + std::vector decision; + + ResourceSet new_load(remote_scheduling_resources.GetLoadResources()); + + // Check if we can accommodate an infeasible task. + for (const auto &task : scheduling_queue_.GetInfeasibleTasks()) { + if (task.GetTaskSpecification().GetRequiredResources().IsSubset( + remote_scheduling_resources.GetTotalResources())) { + decision.push_back(task.GetTaskSpecification().TaskId()); + new_load.AddResources(task.GetTaskSpecification().GetRequiredResources()); + } + } + + for (const auto &task : scheduling_queue_.GetReadyTasks()) { + if (!task.GetTaskSpecification().IsActorTask()) { + if (task.GetTaskSpecification().GetRequiredResources().IsSubset( + remote_scheduling_resources.GetTotalResources())) { + decision.push_back(task.GetTaskSpecification().TaskId()); + new_load.AddResources(task.GetTaskSpecification().GetRequiredResources()); + break; + } + } + } + remote_scheduling_resources.SetLoadResources(std::move(new_load)); + return decision; } diff --git a/src/ray/raylet/scheduling_policy.h b/src/ray/raylet/scheduling_policy.h index 89a725243..b6dc27275 100644 --- a/src/ray/raylet/scheduling_policy.h +++ b/src/ray/raylet/scheduling_policy.h @@ -22,15 +22,21 @@ class SchedulingPolicy { /// \return Void. SchedulingPolicy(const SchedulingQueue &scheduling_queue); - /// Perform a scheduling operation, given a set of cluster resources and - /// producing a mapping of tasks to node managers. + /// \brief Perform a scheduling operation, given a set of cluster resources and + /// producing a mapping of tasks to raylets. /// - /// \param cluster_resources: a set of cluster resources representing - /// configured and current resource capacity on each node. - /// \return Scheduling decision, mapping tasks to node managers for placement. + /// \param cluster_resources: a set of cluster resources containing resource and load + /// information for some subset of the cluster. For all client IDs in the returned + /// placement map, the corresponding SchedulingResources::resources_load_ is + /// incremented by the aggregate resource demand of the tasks assigned to it. + /// \param local_client_id The ID of the node manager that owns this + /// SchedulingPolicy object. + /// \return Scheduling decision, mapping tasks to raylets for placement. std::unordered_map Schedule( - const std::unordered_map &cluster_resources, - const ClientID &local_client_id, const std::vector &others); + std::unordered_map &cluster_resources, + const ClientID &local_client_id); + + std::vector SpillOver(SchedulingResources &remote_scheduling_resources) const; /// \brief SchedulingPolicy destructor. virtual ~SchedulingPolicy(); diff --git a/src/ray/raylet/scheduling_queue.cc b/src/ray/raylet/scheduling_queue.cc index 7458aedb3..290857611 100644 --- a/src/ray/raylet/scheduling_queue.cc +++ b/src/ray/raylet/scheduling_queue.cc @@ -105,6 +105,31 @@ const std::list &SchedulingQueue::GetReadyTasks() const { return this->ready_tasks_.GetTasks(); } +const std::list &SchedulingQueue::GetInfeasibleTasks() const { + return this->infeasible_tasks_.GetTasks(); +} + +ResourceSet SchedulingQueue::GetQueueResources(const TaskQueue &task_queue) const { + // Iterate over all tasks of the specified queue and aggregate total resource + // demand in a resource set. + ResourceSet queue_resources; + for (const auto &task : task_queue.GetTasks()) { + queue_resources.AddResources(task.GetTaskSpecification().GetRequiredResources()); + } + return queue_resources; +} + +ResourceSet SchedulingQueue::GetReadyQueueResources() const { + return GetQueueResources(ready_tasks_); +} + +ResourceSet SchedulingQueue::GetResourceLoad() const { + ResourceSet load_resource_set; + load_resource_set.AddResources(GetReadyQueueResources()); + // TODO(atumanov): consider other types of tasks as part of load. + return load_resource_set; +} + const std::list &SchedulingQueue::GetRunningTasks() const { return this->running_tasks_.GetTasks(); } @@ -131,6 +156,9 @@ void SchedulingQueue::FilterState(std::unordered_set &task_ids, case TaskState::BLOCKED: FilterStateFromQueue(blocked_tasks_, task_ids, filter_state); break; + case TaskState::INFEASIBLE: + FilterStateFromQueue(infeasible_tasks_, task_ids, filter_state); + break; case TaskState::DRIVER: { const auto driver_ids = GetDriverTaskIds(); for (auto it = task_ids.begin(); it != task_ids.end();) { @@ -158,6 +186,7 @@ std::vector SchedulingQueue::RemoveTasks(std::unordered_set &task_ RemoveTasksFromQueue(ready_tasks_, task_ids, removed_tasks); RemoveTasksFromQueue(running_tasks_, task_ids, removed_tasks); RemoveTasksFromQueue(blocked_tasks_, task_ids, removed_tasks); + RemoveTasksFromQueue(infeasible_tasks_, task_ids, removed_tasks); RAY_CHECK(task_ids.size() == 0); return removed_tasks; @@ -191,6 +220,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set &task_ids, TaskState case TaskState::BLOCKED: RemoveTasksFromQueue(blocked_tasks_, task_ids, removed_tasks); break; + case TaskState::INFEASIBLE: + RemoveTasksFromQueue(infeasible_tasks_, task_ids, removed_tasks); + break; default: RAY_LOG(FATAL) << "Attempting to move tasks from unrecognized state " << static_cast::type>(src_state); @@ -212,6 +244,9 @@ void SchedulingQueue::MoveTasks(std::unordered_set &task_ids, TaskState case TaskState::BLOCKED: QueueTasks(blocked_tasks_, removed_tasks); break; + case TaskState::INFEASIBLE: + QueueTasks(infeasible_tasks_, removed_tasks); + break; default: RAY_LOG(FATAL) << "Attempting to move tasks to unrecognized state " << static_cast::type>(dst_state); @@ -227,7 +262,7 @@ bool SchedulingQueue::HasTask(const TaskID &task_id) const { return (methods_waiting_for_actor_creation_.HasTask(task_id) || waiting_tasks_.HasTask(task_id) || placeable_tasks_.HasTask(task_id) || ready_tasks_.HasTask(task_id) || running_tasks_.HasTask(task_id) || - blocked_tasks_.HasTask(task_id)); + blocked_tasks_.HasTask(task_id) || infeasible_tasks_.HasTask(task_id)); } void SchedulingQueue::QueueWaitingTasks(const std::vector &tasks) { @@ -264,6 +299,26 @@ const std::unordered_set &SchedulingQueue::GetDriverTaskIds() const { return driver_task_ids_; } +const std::string SchedulingQueue::ToString() const { + std::string result; + + result += "placeable_tasks_ size is " + + std::to_string(placeable_tasks_.GetTasks().size()) + "\n"; + result += + "waiting_tasks_ size is " + std::to_string(waiting_tasks_.GetTasks().size()) + "\n"; + result += + "ready_tasks_ size is " + std::to_string(ready_tasks_.GetTasks().size()) + "\n"; + result += + "running_tasks_ size is " + std::to_string(running_tasks_.GetTasks().size()) + "\n"; + result += + "blocked_tasks_ size is " + std::to_string(blocked_tasks_.GetTasks().size()) + "\n"; + result += "infeasible_tasks_ size is " + + std::to_string(infeasible_tasks_.GetTasks().size()) + "\n"; + result += "methods_waiting_for_actor_creation_ size is " + + std::to_string(methods_waiting_for_actor_creation_.GetTasks().size()) + "\n"; + return result; +} + } // namespace raylet } // namespace ray diff --git a/src/ray/raylet/scheduling_queue.h b/src/ray/raylet/scheduling_queue.h index 13dc903e2..6b9635586 100644 --- a/src/ray/raylet/scheduling_queue.h +++ b/src/ray/raylet/scheduling_queue.h @@ -12,7 +12,16 @@ namespace ray { namespace raylet { -enum class TaskState { INIT, PLACEABLE, WAITING, READY, RUNNING, BLOCKED, DRIVER }; +enum class TaskState { + INIT, + PLACEABLE, + WAITING, + READY, + RUNNING, + BLOCKED, + DRIVER, + INFEASIBLE +}; /// \class SchedulingQueue /// @@ -51,6 +60,18 @@ class SchedulingQueue { /// dependencies local and that are waiting to be scheduled. const std::list &GetPlaceableTasks() const; + /// Get the queue of tasks in the infeasible state. + /// + /// \return A const reference to the queue of tasks whose resource + /// requirements are not satisfied by any node in the cluster. + const std::list &GetInfeasibleTasks() const; + + /// \brief Return an aggregate resource set for all tasks exerting load on this raylet. + /// + /// \return A resource set with aggregate resource information about resource load on + /// this raylet. + ResourceSet GetResourceLoad() const; + /// Get the queue of tasks in the ready state. /// /// \return A const reference to the queue of tasks ready @@ -153,6 +174,18 @@ class SchedulingQueue { /// \param filter_state The task state to filter out. void FilterState(std::unordered_set &task_ids, TaskState filter_state) const; + /// \brief Return all resource demand associated with the ready queue. + /// + /// \return Aggregate resource demand from ready tasks. + ResourceSet GetReadyQueueResources() const; + + /// Return a human-readable string indicating the number of tasks in each + /// queue. + /// + /// \return A string that can be used to display the contents of the queues + /// for debugging purposes. + const std::string ToString() const; + class TaskQueue { public: /// Creating a task queue. @@ -214,9 +247,18 @@ class SchedulingQueue { /// Tasks that were dispatched to a worker but are blocked on a data /// dependency that was missing at runtime. TaskQueue blocked_tasks_; + /// Tasks that require resources that are not available on any of the nodes + /// in the cluster. + TaskQueue infeasible_tasks_; /// The set of currently running driver tasks. These are empty tasks that are /// started by a driver process on initialization. std::unordered_set driver_task_ids_; + + /// \brief Return all resource demand associated with the specified task queue. + /// + /// \param task_queue The task queue for which aggregate resource demand is calculated. + /// \return Aggregate resource demand. + ResourceSet GetQueueResources(const TaskQueue &task_queue) const; }; } // namespace raylet diff --git a/src/ray/raylet/scheduling_resources.cc b/src/ray/raylet/scheduling_resources.cc index ec5e2ba3d..49519c493 100644 --- a/src/ray/raylet/scheduling_resources.cc +++ b/src/ray/raylet/scheduling_resources.cc @@ -66,13 +66,13 @@ bool ResourceSet::IsEqual(const ResourceSet &rhs) const { } bool ResourceSet::AddResource(const std::string &resource_name, double capacity) { - this->resource_capacity_[resource_name] = capacity; + resource_capacity_[resource_name] = capacity; return true; } bool ResourceSet::RemoveResource(const std::string &resource_name) { throw std::runtime_error("Method not implemented"); } -bool ResourceSet::SubtractResources(const ResourceSet &other) { +bool ResourceSet::SubtractResourcesStrict(const ResourceSet &other) { // Subtract the resources and track whether a resource goes below zero. bool oversubscribed = false; for (const auto &resource_pair : other.GetResourceMap()) { @@ -88,20 +88,31 @@ bool ResourceSet::SubtractResources(const ResourceSet &other) { return !oversubscribed; } -bool ResourceSet::AddResources(const ResourceSet &other) { +// Perform a left join. +bool ResourceSet::AddResourcesStrict(const ResourceSet &other) { // Return failure if attempting to perform vector addition with unknown labels. - // TODO(atumanov): make the implementation atomic. Currently, if false is returned - // the resource capacity may be partially mutated. To reverse, call SubtractResources. + for (const auto &resource_pair : other.GetResourceMap()) { + const std::string &resource_label = resource_pair.first; + const double &resource_capacity = resource_pair.second; + RAY_CHECK(resource_capacity_.count(resource_label) != 0); + resource_capacity_[resource_label] += resource_capacity; + } + return true; +} + +// Perform an outer join. +void ResourceSet::AddResources(const ResourceSet &other) { for (const auto &resource_pair : other.GetResourceMap()) { const std::string &resource_label = resource_pair.first; const double &resource_capacity = resource_pair.second; if (resource_capacity_.count(resource_label) == 0) { - return false; + // Add the new label if not found. + RAY_CHECK(AddResource(resource_label, resource_capacity)); } else { + // Increment the resource by its capacity. resource_capacity_[resource_label] += resource_capacity; } } - return true; } bool ResourceSet::GetResource(const std::string &resource_name, double *value) const { @@ -426,10 +437,14 @@ std::vector> ResourceIdSet::ToF /// SchedulingResources class implementation SchedulingResources::SchedulingResources() - : resources_total_(ResourceSet()), resources_available_(ResourceSet()) {} + : resources_total_(ResourceSet()), + resources_available_(ResourceSet()), + resources_load_(ResourceSet()) {} SchedulingResources::SchedulingResources(const ResourceSet &total) - : resources_total_(total), resources_available_(total) {} + : resources_total_(total), + resources_available_(total), + resources_load_(ResourceSet()) {} SchedulingResources::~SchedulingResources() {} @@ -457,14 +472,22 @@ const ResourceSet &SchedulingResources::GetTotalResources() const { return this->resources_total_; } +void SchedulingResources::SetLoadResources(ResourceSet &&newset) { + resources_load_ = newset; +} + +const ResourceSet &SchedulingResources::GetLoadResources() const { + return resources_load_; +} + // Return specified resources back to SchedulingResources. bool SchedulingResources::Release(const ResourceSet &resources) { - return this->resources_available_.AddResources(resources); + return this->resources_available_.AddResourcesStrict(resources); } // Take specified resources from SchedulingResources. bool SchedulingResources::Acquire(const ResourceSet &resources) { - return this->resources_available_.SubtractResources(resources); + return this->resources_available_.SubtractResourcesStrict(resources); } } // namespace raylet diff --git a/src/ray/raylet/scheduling_resources.h b/src/ray/raylet/scheduling_resources.h index 153118a8e..bac293366 100644 --- a/src/ray/raylet/scheduling_resources.h +++ b/src/ray/raylet/scheduling_resources.h @@ -80,18 +80,27 @@ class ResourceSet { /// \return True, if the resource was successfully removed. False otherwise. bool RemoveResource(const std::string &resource_name); - /// \brief Add a set of resources to the current set of resources. + /// \brief Add a set of resources to the current set of resources only if the resource + /// labels match. /// /// \param other: The other resource set to add. /// \return True if the resource set was added successfully. False otherwise. - bool AddResources(const ResourceSet &other); + bool AddResourcesStrict(const ResourceSet &other); - /// \brief Subtract a set of resources from the current set of resources. + /// \brief Aggregate resources from the other set into this set, adding any missing + /// resource labels to this set. + /// + /// \param other: The other resource set to add. + /// \return Void. + void AddResources(const ResourceSet &other); + + /// \brief Subtract a set of resources from the current set of resources, only if + /// resource labels match. /// /// \param other: The resource set to subtract from the current resource set. /// \return True if the resource set was subtracted successfully. /// False otherwise. - bool SubtractResources(const ResourceSet &other); + bool SubtractResourcesStrict(const ResourceSet &other); /// Return the capacity value associated with the specified resource. /// @@ -340,6 +349,17 @@ class SchedulingResources { const ResourceSet &GetTotalResources() const; + /// \brief Overwrite information about resource load with new resource load set. + /// + /// \param newset: The set of resources that replaces resource load information. + /// \return Void. + void SetLoadResources(ResourceSet &&newset); + + /// \brief Request the resource load information. + /// + /// \return Immutable set of resources describing the load information. + const ResourceSet &GetLoadResources() const; + /// \brief Release the amount of resources specified. /// /// \param resources: the amount of resources to be released. @@ -359,7 +379,8 @@ class SchedulingResources { ResourceSet resources_total_; /// Dynamic resource capacity (e.g., dynamic_resources). ResourceSet resources_available_; - /// gpu_map - replace with ResourceMap (for generality). + /// Resource load. + ResourceSet resources_load_; }; } // namespace raylet diff --git a/src/ray/raylet/task_dependency_manager.cc b/src/ray/raylet/task_dependency_manager.cc index ddd00c95c..247a83fcb 100644 --- a/src/ray/raylet/task_dependency_manager.cc +++ b/src/ray/raylet/task_dependency_manager.cc @@ -106,7 +106,7 @@ std::vector TaskDependencyManager::HandleObjectLocal( std::vector TaskDependencyManager::HandleObjectMissing( const ray::ObjectID &object_id) { - // Add the object to the table of locally available objects. + // Remove the object from the table of locally available objects. auto erased = local_objects_.erase(object_id); RAY_CHECK(erased == 1); @@ -124,6 +124,9 @@ std::vector TaskDependencyManager::HandleObjectMissing( // missing. if (task_entry.num_missing_dependencies == 0) { waiting_task_ids.push_back(dependent_task_id); + // During normal execution we should be able to include the check + // RAY_CHECK(pending_tasks_.count(dependent_task_id) == 1); + // However, this invariant will not hold during unit test execution. } task_entry.num_missing_dependencies++; } @@ -204,6 +207,15 @@ void TaskDependencyManager::UnsubscribeDependencies(const TaskID &task_id) { } } +std::vector TaskDependencyManager::GetPendingTasks() const { + std::vector keys; + keys.reserve(pending_tasks_.size()); + for (const auto &id_task_pair : pending_tasks_) { + keys.push_back(id_task_pair.first); + } + return keys; +} + void TaskDependencyManager::TaskPending(const Task &task) { TaskID task_id = task.GetTaskSpecification().TaskId(); diff --git a/src/ray/raylet/task_dependency_manager.h b/src/ray/raylet/task_dependency_manager.h index 46caca6cc..ea795ad43 100644 --- a/src/ray/raylet/task_dependency_manager.h +++ b/src/ray/raylet/task_dependency_manager.h @@ -99,6 +99,12 @@ class TaskDependencyManager { /// this object dependency. std::vector HandleObjectMissing(const ray::ObjectID &object_id); + /// Get a list of all Tasks currently marked as pending object dependencies in the task + /// dependency manager. + /// + /// \return Return a vector of TaskIDs for tasks registered as pending. + std::vector GetPendingTasks() const; + private: using ObjectDependencyMap = std::unordered_map>; diff --git a/test/actor_test.py b/test/actor_test.py index e51401b1b..f844a83de 100644 --- a/test/actor_test.py +++ b/test/actor_test.py @@ -766,9 +766,6 @@ class ActorsWithGPUs(unittest.TestCase): @unittest.skipIf( os.environ.get('RAY_USE_NEW_GCS', False), "Crashing with new GCS API.") - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 4 @@ -812,9 +809,6 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) assert ready_ids == [] - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorMultipleGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 5 @@ -887,9 +881,6 @@ class ActorsWithGPUs(unittest.TestCase): ready_ids, _ = ray.wait([a.get_location_and_ids.remote()], timeout=10) assert ready_ids == [] - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorDifferentNumbersOfGPUs(self): # Test that we can create actors on two nodes that have different # numbers of GPUs. @@ -982,9 +973,6 @@ class ActorsWithGPUs(unittest.TestCase): assert ready_ids == [] @unittest.skipIf(sys.version_info < (3, 0), "This test requires Python 3.") - @unittest.skipIf( - os.environ.get("RAY_USE_XRAY") == "1", - "This test does not work with xray yet.") def testActorsAndTasksWithGPUs(self): num_local_schedulers = 3 num_gpus_per_scheduler = 6 diff --git a/test/runtest.py b/test/runtest.py index b94cf4943..ac96c82c5 100644 --- a/test/runtest.py +++ b/test/runtest.py @@ -2150,6 +2150,7 @@ class SchedulingAlgorithm(unittest.TestCase): @ray.remote def f(x): + time.sleep(0.010) return ray.worker.global_worker.plasma_client.store_socket_name # This object will be local to one of the local schedulers. Make sure diff --git a/test/stress_tests.py b/test/stress_tests.py index 59c155eb2..95b0d9192 100644 --- a/test/stress_tests.py +++ b/test/stress_tests.py @@ -459,11 +459,21 @@ def test_nondeterministic_task(ray_start_reconstruction): for error in errors) +@pytest.fixture +def ray_start_driver_put_errors(): + plasma_store_memory = 10**9 + # Start the Ray processes. + ray.init(num_cpus=1, object_store_memory=plasma_store_memory) + yield plasma_store_memory + # The code after the yield will run as teardown code. + ray.shutdown() + + @pytest.mark.skipif( os.environ.get("RAY_USE_NEW_GCS") == "on", reason="Failing with new GCS API on Linux.") -def test_driver_put_errors(ray_start_reconstruction): - _, _, plasma_store_memory, _ = ray_start_reconstruction +def test_driver_put_errors(ray_start_driver_put_errors): + plasma_store_memory = ray_start_driver_put_errors # Define the size of one task's return argument so that the combined # sum of all objects' sizes is at least twice the plasma stores' # combined allotted memory.