diff --git a/python/ray/tests/test_object_manager.py b/python/ray/tests/test_object_manager.py index e38733f62..004b1c2f6 100644 --- a/python/ray/tests/test_object_manager.py +++ b/python/ray/tests/test_object_manager.py @@ -296,9 +296,6 @@ def test_pull_request_retry(shutdown_only): ray.get(driver.remote()) -@pytest.mark.skip( - reason="This hangs due to a deadlock between a worker getting its " - "arguments and the node pulling arguments for the next task queued.") @pytest.mark.timeout(30) def test_pull_bundles_admission_control(shutdown_only): cluster = Cluster() @@ -333,9 +330,6 @@ def test_pull_bundles_admission_control(shutdown_only): ray.get(tasks) -@pytest.mark.skip( - reason="This hangs due to a deadlock between a worker getting its " - "arguments and the node pulling arguments for the next task queued.") @pytest.mark.timeout(30) def test_pull_bundles_admission_control_dynamic(shutdown_only): # This test is the same as test_pull_bundles_admission_control, except that @@ -358,11 +352,13 @@ def test_pull_bundles_admission_control_dynamic(shutdown_only): cluster.wait_for_nodes() @ray.remote - def foo(*args): + def foo(i, *args): + print("foo", i) return @ray.remote - def allocate(*args): + def allocate(i): + print("allocate", i) return np.zeros(object_size, dtype=np.uint8) args = [] @@ -373,8 +369,8 @@ def test_pull_bundles_admission_control_dynamic(shutdown_only): ] args.append(task_args) - tasks = [foo.remote(*task_args) for task_args in args] - allocated = [allocate.remote() for _ in range(num_objects)] + tasks = [foo.remote(i, *task_args) for i, task_args in enumerate(args)] + allocated = [allocate.remote(i) for i in range(num_objects)] ray.get(tasks) del allocated diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 3f5b5f7ae..242799dc9 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -618,9 +618,6 @@ def test_release_during_plasma_fetch(object_spilling_config, shutdown_only): do_test_release_resource(object_spilling_config, expect_released=True) -@pytest.mark.skip( - reason="This hangs due to a deadlock between a worker getting its " - "arguments and the node pulling arguments for the next task queued.") @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") @pytest.mark.timeout(30) diff --git a/src/ray/raylet/dependency_manager.cc b/src/ray/raylet/dependency_manager.cc index 988893bea..7c9faf642 100644 --- a/src/ray/raylet/dependency_manager.cc +++ b/src/ray/raylet/dependency_manager.cc @@ -185,12 +185,6 @@ bool DependencyManager::RequestTaskDependencies( return task_entry.num_missing_dependencies == 0; } -bool DependencyManager::IsTaskReady(const TaskID &task_id) const { - auto task_entry = queued_task_requests_.find(task_id); - RAY_CHECK(task_entry != queued_task_requests_.end()); - return task_entry->second.num_missing_dependencies == 0; -} - void DependencyManager::RemoveTaskDependencies(const TaskID &task_id) { RAY_LOG(DEBUG) << "Removing dependencies for task " << task_id; auto task_entry = queued_task_requests_.find(task_id); diff --git a/src/ray/raylet/dependency_manager.h b/src/ray/raylet/dependency_manager.h index 1e7ddfcb1..903a9893a 100644 --- a/src/ray/raylet/dependency_manager.h +++ b/src/ray/raylet/dependency_manager.h @@ -37,7 +37,6 @@ class TaskDependencyManagerInterface { virtual bool RequestTaskDependencies( const TaskID &task_id, const std::vector &required_objects) = 0; - virtual bool IsTaskReady(const TaskID &task_id) const = 0; virtual void RemoveTaskDependencies(const TaskID &task_id) = 0; virtual ~TaskDependencyManagerInterface(){}; }; @@ -131,14 +130,6 @@ class DependencyManager : public TaskDependencyManagerInterface { bool RequestTaskDependencies(const TaskID &task_id, const std::vector &required_objects); - /// Check whether a task is ready to run. The task ID must have been - /// previously added by the caller. - /// - /// \param task_id The ID of the task to check. - /// \return Whether all of the dependencies for the task are - /// local. - bool IsTaskReady(const TaskID &task_id) const; - /// Cancel a task's dependencies. We will no longer attempt to fetch any /// remote dependencies, if no other task or worker requires them. /// diff --git a/src/ray/raylet/dependency_manager_test.cc b/src/ray/raylet/dependency_manager_test.cc index c6d0ab2ee..6ea260bc3 100644 --- a/src/ray/raylet/dependency_manager_test.cc +++ b/src/ray/raylet/dependency_manager_test.cc @@ -89,7 +89,6 @@ TEST_F(DependencyManagerTest, TestSimpleTask) { dependency_manager_.RequestTaskDependencies(task_id, ObjectIdsToRefs(arguments)); ASSERT_FALSE(ready); ASSERT_EQ(object_manager_mock_.active_requests.size(), 1); - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); // For each argument, tell the task dependency manager that the argument is // local. All arguments should be canceled as they become available locally. @@ -98,15 +97,12 @@ TEST_F(DependencyManagerTest, TestSimpleTask) { } auto ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[0]); ASSERT_TRUE(ready_task_ids.empty()); - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[1]); ASSERT_TRUE(ready_task_ids.empty()); - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); // The task is ready to run. ready_task_ids = dependency_manager_.HandleObjectLocal(arguments[2]); ASSERT_EQ(ready_task_ids.size(), 1); ASSERT_EQ(ready_task_ids.front(), task_id); - ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id)); // Remove the task. dependency_manager_.RemoveTaskDependencies(task_id); @@ -127,7 +123,6 @@ TEST_F(DependencyManagerTest, TestMultipleTasks) { bool ready = dependency_manager_.RequestTaskDependencies( task_id, ObjectIdsToRefs({argument_id})); ASSERT_FALSE(ready); - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); // The object should be requested from the object manager once for each task. ASSERT_EQ(object_manager_mock_.active_requests.size(), i + 1); } @@ -139,7 +134,6 @@ TEST_F(DependencyManagerTest, TestMultipleTasks) { std::unordered_set added_tasks(dependent_tasks.begin(), dependent_tasks.end()); for (auto &id : ready_task_ids) { ASSERT_TRUE(added_tasks.erase(id)); - ASSERT_TRUE(dependency_manager_.IsTaskReady(id)); } ASSERT_TRUE(added_tasks.empty()); @@ -166,7 +160,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) { bool ready = dependency_manager_.RequestTaskDependencies(task_id, ObjectIdsToRefs(arguments)); ASSERT_FALSE(ready); - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); // Tell the task dependency manager that each of the arguments is now // available. @@ -183,7 +176,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) { ASSERT_TRUE(ready_tasks.empty()); } } - ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id)); // Simulate each of the arguments getting evicted. Each object should now be // considered remote. @@ -203,7 +195,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) { // the waiting state. ASSERT_TRUE(waiting_tasks.empty()); } - ASSERT_FALSE(dependency_manager_.IsTaskReady(task_id)); } // Tell the task dependency manager that each of the arguments is available @@ -221,7 +212,6 @@ TEST_F(DependencyManagerTest, TestTaskArgEviction) { ASSERT_TRUE(ready_tasks.empty()); } } - ASSERT_TRUE(dependency_manager_.IsTaskReady(task_id)); dependency_manager_.RemoveTaskDependencies(task_id); AssertNoLeaks(); diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index e1ac5eb67..251e28e26 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -222,7 +222,11 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self self_node_id_, std::dynamic_pointer_cast(cluster_resource_scheduler_), dependency_manager_, is_owner_alive, get_node_info_func, announce_infeasible_task, - worker_pool_, leased_workers_)); + worker_pool_, leased_workers_, + [this](const std::vector &object_ids, + std::vector> *results) { + return GetObjectsFromPlasma(object_ids, results); + })); placement_group_resource_manager_ = std::make_shared( std::dynamic_pointer_cast( @@ -1242,8 +1246,9 @@ void NodeManager::DisconnectClient(const std::shared_ptr &clie if ((!task_id.IsNil() || !actor_id.IsNil()) && !worker->IsDead()) { // If the worker was an actor, it'll be cleaned by GCS. if (actor_id.IsNil()) { + // Return the resources that were being used by this worker. Task task; - static_cast(local_queues_.RemoveTask(task_id, &task)); + cluster_task_manager_->TaskFinished(worker, &task); } if (disconnect_type == rpc::WorkerExitType::SYSTEM_ERROR_EXIT) { @@ -2365,6 +2370,33 @@ std::string compact_tag_string(const opencensus::stats::ViewDescriptor &view, return result.str(); } +bool NodeManager::GetObjectsFromPlasma(const std::vector &object_ids, + std::vector> *results) { + // Pin the objects in plasma by getting them and holding a reference to + // the returned buffer. + // NOTE: the caller must ensure that the objects already exist in plasma before + // sending a PinObjectIDs request. + std::vector plasma_results; + // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not + // block when serving the request. However, if the plasma store is under + // heavy load, then this request can still block the NodeManager event loop + // since we must wait for the plasma store's reply. We should consider using + // an `AsyncGet` instead. + if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) { + return false; + } + + for (const auto &plasma_result : plasma_results) { + if (plasma_result.data == nullptr) { + results->push_back(nullptr); + } else { + results->emplace_back(std::unique_ptr( + new RayObject(plasma_result.data, plasma_result.metadata, {}))); + } + } + return true; +} + void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, rpc::PinObjectIDsReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -2374,33 +2406,16 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, object_ids.push_back(ObjectID::FromBinary(object_id_binary)); } if (object_pinning_enabled_) { - // Pin the objects in plasma by getting them and holding a reference to - // the returned buffer. - // NOTE: the caller must ensure that the objects already exist in plasma before - // sending a PinObjectIDs request. - std::vector plasma_results; - // TODO(swang): This `Get` has a timeout of 0, so the plasma store will not - // block when serving the request. However, if the plasma store is under - // heavy load, then this request can still block the NodeManager event loop - // since we must wait for the plasma store's reply. We should consider using - // an `AsyncGet` instead. - if (!store_client_.Get(object_ids, /*timeout_ms=*/0, &plasma_results).ok()) { - RAY_LOG(WARNING) << "Failed to get objects to be pinned from object store."; + std::vector> results; + if (!GetObjectsFromPlasma(object_ids, &results)) { + RAY_LOG(WARNING) + << "Failed to get objects that should have been in the object store. These " + "objects may have been evicted while there are still references in scope."; // TODO(suquark): Maybe "Status::ObjectNotFound" is more accurate here. send_reply_callback(Status::Invalid("Failed to get objects."), nullptr, nullptr); return; } - - std::vector> objects; - for (int64_t i = 0; i < request.object_ids().size(); i++) { - if (plasma_results[i].data == nullptr) { - objects.push_back(nullptr); - } else { - objects.emplace_back(std::unique_ptr( - new RayObject(plasma_results[i].data, plasma_results[i].metadata, {}))); - } - } - local_object_manager_.PinObjects(object_ids, std::move(objects)); + local_object_manager_.PinObjects(object_ids, std::move(results)); } // Wait for the object to be freed by the owner, which keeps the ref count. local_object_manager_.WaitForObjectFree(request.owner_address(), object_ids); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index 3a68fcbae..606dc3ac6 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -647,6 +647,16 @@ class NodeManager : public rpc::NodeManagerServiceHandler, std::unordered_map> MakeTasksByClass( const std::vector &tasks) const; + /// Get pointers to objects stored in plasma. They will be + /// released once the returned references go out of scope. + /// + /// \param[in] object_ids The objects to get. + /// \param[out] results The pointers to objects stored in + /// plasma. + /// \return Whether the request was successful. + bool GetObjectsFromPlasma(const std::vector &object_ids, + std::vector> *results); + /////////////////////////////////////////////////////////////////////////////////////// //////////////////// Begin of the override methods of ClusterTaskManager ////////////// // The following methods are defined in node_manager.task.cc instead of node_manager.cc diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index a4dbff1f4..109833eb5 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -20,7 +20,10 @@ ClusterTaskManager::ClusterTaskManager( NodeInfoGetter get_node_info, std::function announce_infeasible_task, WorkerPoolInterface &worker_pool, - std::unordered_map> &leased_workers) + std::unordered_map> &leased_workers, + std::function &object_ids, + std::vector> *results)> + pin_task_arguments) : self_node_id_(self_node_id), cluster_resource_scheduler_(cluster_resource_scheduler), task_dependency_manager_(task_dependency_manager), @@ -31,7 +34,8 @@ ClusterTaskManager::ClusterTaskManager( RayConfig::instance().max_resource_shapes_per_load_report()), report_worker_backlog_(RayConfig::instance().report_worker_backlog()), worker_pool_(worker_pool), - leased_workers_(leased_workers) {} + leased_workers_(leased_workers), + pin_task_arguments_(pin_task_arguments) {} bool ClusterTaskManager::SchedulePendingTasks() { // Always try to schedule infeasible tasks in case they are now feasible. @@ -144,11 +148,36 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( auto &task = std::get<0>(work); auto &spec = task.GetTaskSpecification(); + std::vector> args; + bool success = true; + const auto &deps = spec.GetDependencyIds(); + if (!deps.empty()) { + // This gets refs to the arguments stored in plasma. The refs should be + // deleted once we no longer need to pin the arguments. + success = pin_task_arguments_(deps, &args); + if (!success) { + RAY_LOG(WARNING) << "Error getting task arguments from plasma store"; + } + for (size_t i = 0; i < deps.size(); i++) { + if (args[i] == nullptr) { + // This can happen if the task's arguments were all local at some + // point, but then at least one was evicted before the task could + // be dispatched to a worker. + RAY_LOG(INFO) + << "Task " << spec.TaskId() << " argument " << deps[i] + << " was evicted before the task could be dispatched. This can happen " + "when there are many objects needed on this node. The task will be " + "scheduled once all of its dependencies are local."; + success = false; + break; + } + } + } + // An argument was evicted since this task was added to the dispatch // queue. Move it back to the waiting queue. The caller is responsible // for notifying us when the task is unblocked again. - if (!spec.GetDependencies().empty() && - !task_dependency_manager_.IsTaskReady(spec.TaskId())) { + if (!success) { waiting_tasks_[spec.TaskId()] = std::move(*work_it); work_it = dispatch_queue.erase(work_it); continue; @@ -177,6 +206,12 @@ void ClusterTaskManager::DispatchScheduledTasksToWorkers( bool worker_leased; bool remove = AttemptDispatchWork(*work_it, worker, &worker_leased); if (worker_leased) { + // Pin the arguments while the lease is active. These will be erased + // once the lease is returned. + num_pinned_task_arguments_ += args.size(); + RAY_CHECK(pinned_task_arguments_.emplace(spec.TaskId(), std::move(args)).second) + << spec.TaskId(); + auto reply = std::get<1>(*work_it); auto callback = std::get<2>(*work_it); Dispatch(worker, leased_workers_, task, reply, callback); @@ -295,6 +330,10 @@ void ClusterTaskManager::TaskFinished(std::shared_ptr worker, Task *task) { RAY_CHECK(worker != nullptr && task != nullptr); *task = worker->GetAssignedTask(); + auto it = pinned_task_arguments_.find(task->GetTaskSpecification().TaskId()); + RAY_CHECK(it != pinned_task_arguments_.end()); + num_pinned_task_arguments_ -= it->second.size(); + pinned_task_arguments_.erase(it); if (worker->GetAllocatedInstances() != nullptr) { ReleaseWorkerResources(worker); } @@ -633,6 +672,8 @@ std::string ClusterTaskManager::DebugStr() const { buffer << "Schedule queue length: " << num_tasks_to_schedule << "\n"; buffer << "Dispatch queue length: " << num_tasks_to_dispatch << "\n"; buffer << "Waiting tasks size: " << waiting_tasks_.size() << "\n"; + buffer << "Number of executing tasks: " << pinned_task_arguments_.size() << "\n"; + buffer << "Number of pinned task arguments: " << num_pinned_task_arguments_ << "\n"; buffer << "cluster_resource_scheduler state: " << cluster_resource_scheduler_->DebugString() << "\n"; buffer << "=================================================="; diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index f632357e1..7f2652ceb 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -2,6 +2,7 @@ #include "absl/container/flat_hash_map.h" #include "absl/container/flat_hash_set.h" +#include "ray/common/ray_object.h" #include "ray/common/task/task.h" #include "ray/common/task/task_common.h" #include "ray/raylet/dependency_manager.h" @@ -60,7 +61,10 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { NodeInfoGetter get_node_info, std::function announce_infeasible_task, WorkerPoolInterface &worker_pool, - std::unordered_map> &leased_workers); + std::unordered_map> &leased_workers, + std::function &object_ids, + std::vector> *results)> + pin_task_arguments); /// (Step 1) Queue tasks and schedule. /// Queue task and schedule. This hanppens when processing the worker lease request. @@ -248,6 +252,22 @@ class ClusterTaskManager : public ClusterTaskManagerInterface { WorkerPoolInterface &worker_pool_; std::unordered_map> &leased_workers_; + /// Callback to get references to task arguments. These will be pinned while + /// the task is running. + std::function &object_ids, + std::vector> *results)> + pin_task_arguments_; + + /// Arguments needed by currently granted lease requests. These should be + /// pinned before the lease is granted to ensure that the arguments are not + /// evicted before the task(s) start running. + std::unordered_map>> + pinned_task_arguments_; + + /// The total number of arguments pinned for running tasks. + /// Used for debug purposes. + size_t num_pinned_task_arguments_ = 0; + /// Determine whether a task should be immediately dispatched, /// or placed on a wait queue. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager_test.cc b/src/ray/raylet/scheduling/cluster_task_manager_test.cc index 776e7fc53..80a9406da 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager_test.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager_test.cc @@ -85,7 +85,7 @@ Task CreateTask(const std::unordered_map &required_resource std::make_pair(PlacementGroupID::Nil(), -1), true, ""); for (int i = 0; i < num_args; i++) { - ObjectID put_id = ObjectID::FromIndex(TaskID::Nil(), /*index=*/i + 1); + ObjectID put_id = ObjectID::FromIndex(RandomTaskId(), /*index=*/i + 1); spec_builder.AddArg(TaskArgByReference(put_id, rpc::Address())); } @@ -96,20 +96,25 @@ Task CreateTask(const std::unordered_map &required_resource class MockTaskDependencyManager : public TaskDependencyManagerInterface { public: + MockTaskDependencyManager(std::unordered_set &missing_objects) + : missing_objects_(missing_objects) {} + bool RequestTaskDependencies( const TaskID &task_id, const std::vector &required_objects) { RAY_CHECK(subscribed_tasks.insert(task_id).second); - return task_ready_; + for (auto &obj_ref : required_objects) { + if (missing_objects_.count(ObjectRefToId(obj_ref))) { + return false; + } + } + return true; } void RemoveTaskDependencies(const TaskID &task_id) { RAY_CHECK(subscribed_tasks.erase(task_id)); } - bool IsTaskReady(const TaskID &task_id) const { return task_ready_; } - - bool task_ready_ = true; - + std::unordered_set &missing_objects_; std::unordered_set subscribed_tasks; }; @@ -121,16 +126,34 @@ class ClusterTaskManagerTest : public ::testing::Test { is_owner_alive_(true), node_info_calls_(0), announce_infeasible_task_calls_(0), - task_manager_(id_, scheduler_, dependency_manager_, - [this](const WorkerID &worker_id, const NodeID &node_id) { - return is_owner_alive_; - }, - [this](const NodeID &node_id) { - node_info_calls_++; - return node_info_[node_id]; - }, - [this](const Task &task) { announce_infeasible_task_calls_++; }, - pool_, leased_workers_) {} + dependency_manager_(missing_objects_), + task_manager_( + id_, scheduler_, dependency_manager_, + [this](const WorkerID &worker_id, const NodeID &node_id) { + return is_owner_alive_; + }, + [this](const NodeID &node_id) { + node_info_calls_++; + return node_info_[node_id]; + }, + [this](const Task &task) { announce_infeasible_task_calls_++; }, pool_, + leased_workers_, + [this](const std::vector &object_ids, + std::vector> *results) { + for (auto &obj_id : object_ids) { + if (missing_objects_.count(obj_id) == 0) { + std::string meta = "metadata"; + auto metadata = const_cast( + reinterpret_cast(meta.data())); + auto meta_buffer = + std::make_shared(metadata, meta.size()); + results->emplace_back(new RayObject(nullptr, meta_buffer, {})); + } else { + results->emplace_back(nullptr); + } + } + return true; + }) {} void SetUp() {} @@ -153,13 +176,25 @@ class ClusterTaskManagerTest : public ::testing::Test { ASSERT_TRUE(task_manager_.tasks_to_dispatch_.empty()); ASSERT_TRUE(task_manager_.waiting_tasks_.empty()); ASSERT_TRUE(task_manager_.infeasible_tasks_.empty()); + ASSERT_TRUE(task_manager_.pinned_task_arguments_.empty()); + ASSERT_EQ(task_manager_.num_pinned_task_arguments_, 0); ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty()); } + void AssertPinnedTaskArgumentsEquals(const TaskID &task_id, size_t num_args_expected) { + ASSERT_EQ(task_manager_.pinned_task_arguments_[task_id].size(), num_args_expected); + size_t num_args = 0; + for (auto &args : task_manager_.pinned_task_arguments_) { + num_args += args.second.size(); + } + ASSERT_EQ(task_manager_.num_pinned_task_arguments_, num_args); + } + NodeID id_; std::shared_ptr scheduler_; MockWorkerPool pool_; std::unordered_map> leased_workers_; + std::unordered_set missing_objects_; bool is_owner_alive_; @@ -203,6 +238,11 @@ TEST_F(ClusterTaskManagerTest, BasicTest) { ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(node_info_calls_, 0); + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), + task.GetTaskSpecification().TaskId()); + AssertNoLeaks(); } @@ -252,8 +292,9 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { }; /* Blocked on dependencies */ - dependency_manager_.task_ready_ = false; - auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); + auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2); + auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0]; + missing_objects_.insert(missing_arg); std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; task_manager_.QueueAndScheduleTask(task, &reply, callback); @@ -264,36 +305,42 @@ TEST_F(ClusterTaskManagerTest, ResourceTakenWhileResolving) { ASSERT_EQ(pool_.workers.size(), 2); /* This task can run */ - auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}); + auto task2 = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 1); task_manager_.QueueAndScheduleTask(task2, &reply, callback); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); + AssertPinnedTaskArgumentsEquals(task2.GetTaskSpecification().TaskId(), 1); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); /* First task is unblocked now, but resources are no longer available */ - dependency_manager_.task_ready_ = true; + missing_objects_.erase(missing_arg); auto id = task.GetTaskSpecification().TaskId(); std::vector unblocked = {id}; task_manager_.TasksUnblocked(unblocked); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); + AssertPinnedTaskArgumentsEquals(task2.GetTaskSpecification().TaskId(), 1); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 1); /* Second task finishes, making space for the original task */ + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); leased_workers_.clear(); - task_manager_.ReleaseWorkerResources(worker); task_manager_.ScheduleAndDispatchTasks(); ASSERT_TRUE(dependency_manager_.subscribed_tasks.empty()); // Task2 is now done so task can run. + AssertPinnedTaskArgumentsEquals(task.GetTaskSpecification().TaskId(), 2); ASSERT_EQ(num_callbacks, 2); ASSERT_EQ(leased_workers_.size(), 1); ASSERT_EQ(pool_.workers.size(), 0); + + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); AssertNoLeaks(); } @@ -342,6 +389,12 @@ TEST_F(ClusterTaskManagerTest, TestSpillAfterAssigned) { // The second task was spilled. ASSERT_EQ(spillback_reply.retry_at_raylet_address().raylet_id(), remote_node_id.Binary()); + + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), + task.GetTaskSpecification().TaskId()); + AssertNoLeaks(); } @@ -385,6 +438,12 @@ TEST_F(ClusterTaskManagerTest, TaskCancellationTest) { ASSERT_FALSE(callback_called); ASSERT_EQ(pool_.workers.size(), 0); ASSERT_EQ(leased_workers_.size(), 1); + + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), + task.GetTaskSpecification().TaskId()); + AssertNoLeaks(); } @@ -615,6 +674,12 @@ TEST_F(ClusterTaskManagerTest, BacklogReportTest) { task_manager_.FillResourceUsage(data); auto resource_load_by_shape = data->resource_load_by_shape(); ASSERT_EQ(resource_load_by_shape.resource_demands().size(), 0); + + while (!leased_workers_.empty()) { + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + leased_workers_.erase(leased_workers_.begin()); + } AssertNoLeaks(); } } @@ -785,8 +850,9 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { }; /* Blocked on dependencies */ - dependency_manager_.task_ready_ = false; auto task = CreateTask({{ray::kCPU_ResourceLabel, 5}}, 2); + auto missing_arg = task.GetTaskSpecification().GetDependencyIds()[0]; + missing_objects_.insert(missing_arg); std::unordered_set expected_subscribed_tasks = { task.GetTaskSpecification().TaskId()}; task_manager_.QueueAndScheduleTask(task, &reply, callback); @@ -795,7 +861,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { ASSERT_EQ(leased_workers_.size(), 0); /* Task is unblocked now */ - dependency_manager_.task_ready_ = true; + missing_objects_.erase(missing_arg); pool_.workers.clear(); auto id = task.GetTaskSpecification().TaskId(); task_manager_.TasksUnblocked({id}); @@ -804,7 +870,7 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { ASSERT_EQ(leased_workers_.size(), 0); /* Task argument gets evicted */ - dependency_manager_.task_ready_ = false; + missing_objects_.insert(missing_arg); pool_.PushWorker(std::dynamic_pointer_cast(worker)); task_manager_.ScheduleAndDispatchTasks(); ASSERT_EQ(dependency_manager_.subscribed_tasks, expected_subscribed_tasks); @@ -812,10 +878,16 @@ TEST_F(ClusterTaskManagerTest, ArgumentEvicted) { ASSERT_EQ(leased_workers_.size(), 0); /* Worker available and arguments available */ - dependency_manager_.task_ready_ = true; + missing_objects_.erase(missing_arg); task_manager_.TasksUnblocked({id}); ASSERT_EQ(num_callbacks, 1); ASSERT_EQ(leased_workers_.size(), 1); + + Task finished_task; + task_manager_.TaskFinished(leased_workers_.begin()->second, &finished_task); + ASSERT_EQ(finished_task.GetTaskSpecification().TaskId(), + task.GetTaskSpecification().TaskId()); + AssertNoLeaks(); } diff --git a/src/ray/raylet/test/util.h b/src/ray/raylet/test/util.h index 8527220e3..c43a386fb 100644 --- a/src/ray/raylet/test/util.h +++ b/src/ray/raylet/test/util.h @@ -33,7 +33,7 @@ class MockWorker : public WorkerInterface { void AssignTaskId(const TaskID &task_id) {} - void SetAssignedTask(const Task &assigned_task) {} + void SetAssignedTask(const Task &assigned_task) { task_ = assigned_task; } const std::string IpAddress() const { return address_.ip_address(); } @@ -162,11 +162,7 @@ class MockWorker : public WorkerInterface { void SetBundleId(const BundleID &bundle_id) { bundle_id_ = bundle_id; } - Task &GetAssignedTask() { - RAY_CHECK(false) << "Method unused"; - auto *t = new Task(); - return *t; - } + Task &GetAssignedTask() { return task_; } bool IsRegistered() { RAY_CHECK(false) << "Method unused"; @@ -188,6 +184,7 @@ class MockWorker : public WorkerInterface { bool is_detached_actor_; BundleID bundle_id_; bool blocked_ = false; + Task task_; }; } // namespace raylet