diff --git a/python/ray/tests/test_basic.py b/python/ray/tests/test_basic.py index 3eac6047a..8a875209b 100644 --- a/python/ray/tests/test_basic.py +++ b/python/ray/tests/test_basic.py @@ -63,6 +63,9 @@ def test_simple_serialization(ray_start_regular): np.float64(1.9), ] + if sys.version_info < (3, 0): + primitive_objects.append(long(0)) # noqa: E501,F821 + composite_objects = ( [[obj] for obj in primitive_objects] + [(obj, ) @@ -88,25 +91,6 @@ def test_simple_serialization(ray_start_regular): assert type(obj) == type(new_obj_2) -def test_background_tasks_with_max_calls(shutdown_only): - ray.init(num_cpus=2) - - @ray.remote - def g(): - time.sleep(.1) - return 0 - - @ray.remote(max_calls=1, max_retries=0) - def f(): - return [g.remote()] - - nested = ray.get([f.remote() for _ in range(10)]) - - # Should still be able to retrieve these objects, since f's workers will - # wait for g to finish before exiting. - ray.get([x[0] for x in nested]) - - def test_fair_queueing(shutdown_only): ray.init( num_cpus=1, _internal_config=json.dumps({ @@ -182,7 +166,17 @@ def complex_serialization(use_pickle): assert obj1 == obj2, "Objects {} and {} are different.".format( obj1, obj2) - long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])] + if sys.version_info >= (3, 0): + long_extras = [0, np.array([["hi", u"hi"], [1.3, 1]])] + else: + + long_extras = [ + long(0), # noqa: E501,F821 + np.array([ + ["hi", u"hi"], + [1.3, long(1)] # noqa: E501,F821 + ]) + ] PRIMITIVE_OBJECTS = [ 0, 0.0, 0.9, 1 << 62, 1 << 100, 1 << 999, [1 << 100, [1 << 100]], "a", @@ -797,6 +791,8 @@ def test_keyword_args(ray_start_regular): assert ray.get(f3.remote(4)) == 4 +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="This test requires Python 3.") @pytest.mark.parametrize( "ray_start_regular", [{ "local_mode": True @@ -832,6 +828,8 @@ def test_args_starkwargs(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="This test requires Python 3.") @pytest.mark.parametrize( "ray_start_regular", [{ "local_mode": True @@ -873,6 +871,8 @@ def test_args_named_and_star(ray_start_regular): ray.get(remote_test_function.remote(local_method, actor_method)) +@pytest.mark.skipif( + sys.version_info < (3, 0), reason="This test requires Python 3.") @pytest.mark.parametrize( "ray_start_regular", [{ "local_mode": True @@ -1636,4 +1636,5 @@ def test_wait(ray_start_regular): if __name__ == "__main__": import pytest + import sys sys.exit(pytest.main(["-v", __file__])) diff --git a/src/ray/common/scheduling/cluster_resource_scheduler.cc b/src/ray/common/scheduling/cluster_resource_scheduler.cc index db4ec83a6..65e6ea5ac 100644 --- a/src/ray/common/scheduling/cluster_resource_scheduler.cc +++ b/src/ray/common/scheduling/cluster_resource_scheduler.cc @@ -372,7 +372,9 @@ void ClusterResourceScheduler::ResourceMapToTaskRequest( const std::unordered_map &resource_map, TaskRequest *task_request) { size_t i = 0; + task_request->predefined_resources.resize(PredefinedResources_MAX); + task_request->custom_resources.resize(resource_map.size()); for (size_t i = 0; i < PredefinedResources_MAX; i++) { task_request->predefined_resources[0].demand = 0; task_request->predefined_resources[0].soft = false; @@ -388,13 +390,13 @@ void ClusterResourceScheduler::ResourceMapToTaskRequest( } else if (it->first == ray::kMemory_ResourceLabel) { task_request->predefined_resources[MEM].demand = it->second; } else { - // This is a custom resource. task_request->custom_resources[i].id = string_to_int_map_.Insert(it->first); task_request->custom_resources[i].req.demand = it->second; task_request->custom_resources[i].req.soft = false; i++; } } + task_request->custom_resources.resize(i); } void ClusterResourceScheduler::UpdateResourceCapacity(const std::string &client_id_string, diff --git a/src/ray/common/scheduling/scheduling_ids.cc b/src/ray/common/scheduling/scheduling_ids.cc index 95e7a6de9..976deadd3 100644 --- a/src/ray/common/scheduling/scheduling_ids.cc +++ b/src/ray/common/scheduling/scheduling_ids.cc @@ -13,7 +13,7 @@ std::string StringIdMap::Get(uint64_t id) { std::string id_string; auto it = int_to_string_.find(id); if (it == int_to_string_.end()) { - id_string = std::to_string(-1); + id_string = "-1"; } else { id_string = it->second; } diff --git a/src/ray/common/scheduling/scheduling_ids.h b/src/ray/common/scheduling/scheduling_ids.h index 8110cf18d..8b06afa85 100644 --- a/src/ray/common/scheduling/scheduling_ids.h +++ b/src/ray/common/scheduling/scheduling_ids.h @@ -2,6 +2,7 @@ #define RAY_COMMON_SCHEDULING_SCHEDULING_IDS_H #include "absl/container/flat_hash_map.h" +#include "ray/util/logging.h" #include diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index a41241b4d..c6c4fc910 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1187,20 +1187,12 @@ void NodeManager::ProcessDisconnectClientMessage( local_available_resources_.ReleaseConstrained( task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - if (new_scheduler_enabled_) { - new_resource_scheduler_->AddNodeAvailableResources( - self_node_id_.Binary(), task_resources.ToResourceSet().GetResourceMap()); - } worker->ResetTaskResourceIds(); auto const &lifetime_resources = worker->GetLifetimeResourceIds(); local_available_resources_.ReleaseConstrained( lifetime_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); cluster_resource_map_[self_node_id_].Release(lifetime_resources.ToResourceSet()); - if (new_scheduler_enabled_) { - new_resource_scheduler_->AddNodeAvailableResources( - self_node_id_.Binary(), lifetime_resources.ToResourceSet().GetResourceMap()); - } worker->ResetLifetimeResourceIds(); RAY_LOG(DEBUG) << "Worker (pid=" << worker->Pid() << ") is disconnected. " @@ -1451,11 +1443,27 @@ void NodeManager::DispatchScheduledTasksToWorkers() { while (!tasks_to_dispatch_.empty()) { auto task = tasks_to_dispatch_.front(); auto reply = task.first; - std::shared_ptr worker = - worker_pool_.PopWorker(task.second.GetTaskSpecification()); + auto spec = task.second.GetTaskSpecification(); + std::shared_ptr worker = worker_pool_.PopWorker(spec); if (worker == nullptr) { return; } + + bool schedulable = new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap()); + if (!schedulable) { + return; + } + // Handle the allocation to specific resource IDs. + auto acquired_resources = + local_available_resources_.Acquire(spec.GetRequiredResources()); + cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources()); + if (spec.IsActorCreationTask()) { + worker->SetLifetimeResourceIds(acquired_resources); + } else { + worker->SetTaskResourceIds(acquired_resources); + } + reply(worker, ClientID::Nil(), "", -1); tasks_to_dispatch_.pop_front(); } @@ -1475,11 +1483,11 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { /// There is no node that has available resources to run the request. break; } else { - new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string, - request_resources); if (node_id_string == self_node_id_.Binary()) { - tasks_to_dispatch_.push_back(work); + WaitForTaskArgsRequests(work); } else { + new_resource_scheduler_->SubtractNodeAvailableResources(node_id_string, + request_resources); ClientID node_id = ClientID::FromBinary(node_id_string); auto node_info_opt = gcs_client_->Nodes().Get(node_id); RAY_CHECK(node_info_opt) @@ -1494,6 +1502,24 @@ void NodeManager::NewSchedulerSchedulePendingTasks() { DispatchScheduledTasksToWorkers(); } +void NodeManager::WaitForTaskArgsRequests(std::pair &work) { + RAY_CHECK(new_scheduler_enabled_); + std::vector object_ids = work.second.GetTaskSpecification().GetDependencies(); + + if (object_ids.size() > 0) { + ray::Status status = object_manager_.Wait( + object_ids, -1, object_ids.size(), false, + [this, work](std::vector found, std::vector remaining) { + RAY_CHECK(remaining.empty()); + tasks_to_dispatch_.push_back(work); + DispatchScheduledTasksToWorkers(); + }); + RAY_CHECK_OK(status); + } else { + tasks_to_dispatch_.push_back(work); + } +}; + void NodeManager::HandleWorkerLeaseRequest(const rpc::RequestWorkerLeaseRequest &request, rpc::RequestWorkerLeaseReply *reply, rpc::SendReplyCallback send_reply_callback) { @@ -1514,8 +1540,7 @@ void NodeManager::HandleWorkerLeaseRequest(const rpc::RequestWorkerLeaseRequest } if (new_scheduler_enabled_) { - auto request_resources = - task.GetTaskSpecification().GetRequiredResources().GetResourceMap(); + auto request_resources = task.GetTaskSpecification().GetRequiredResources(); auto work = std::make_pair( [this, request_resources, reply, send_reply_callback]( std::shared_ptr worker, ClientID spillback_to, std::string address, @@ -1599,12 +1624,38 @@ void NodeManager::HandleReturnWorker(const rpc::ReturnWorkerRequest &request, std::shared_ptr worker = std::move(leased_workers_[worker_id]); if (new_scheduler_enabled_) { + if (worker->IsBlocked()) { + // If worker blocked, unblock it to return the cpu resources back to the worker. + HandleDirectCallTaskUnblocked(worker); + } auto it = leased_worker_resources_.find(worker_id); RAY_CHECK(it != leased_worker_resources_.end()); + new_resource_scheduler_->AddNodeAvailableResources(self_node_id_.Binary(), - it->second); + it->second.GetResourceMap()); + + if (worker->borrowed_cpu_resources_.GetResourceMap().size()) { + // This machine is oversubscribed, so the worker didn't get back cpus when + // unblocked. Thus we need to substract these cpus, as the previous + // "AddNodeAvailableResources" call assumed they were allocated to this worker. + new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), worker->borrowed_cpu_resources_.GetResourceMap()); + worker->borrowed_cpu_resources_ = ResourceSet(); + } leased_worker_resources_.erase(it); - NewSchedulerSchedulePendingTasks(); + + // Update resource ids. + auto const &task_resources = worker->GetTaskResourceIds(); + local_available_resources_.ReleaseConstrained( + task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); + cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); + worker->ResetTaskResourceIds(); + + // TODO (ion): Handle ProcessDisconnectClientMessage() + HandleWorkerAvailable(worker); + leased_workers_.erase(worker_id); + send_reply_callback(Status::OK(), nullptr, nullptr); + return; } leased_workers_.erase(worker_id); @@ -2003,13 +2054,21 @@ void NodeManager::SubmitTask(const Task &task, const Lineage &uncommitted_lineag void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr &worker) { if (new_scheduler_enabled_) { - // TODO (ion): replace this hard coded # of CPUs. - std::unordered_map task_request; - task_request.emplace(kCPU_ResourceLabel, 1.); - new_resource_scheduler_->AddNodeAvailableResources(self_node_id_.Binary(), - task_request); + if (!worker) { + return; + } + auto const cpu_resource_ids = worker->ReleaseTaskCpuResources(); + local_available_resources_.Release(cpu_resource_ids); + cluster_resource_map_[self_node_id_].Release(cpu_resource_ids.ToResourceSet()); + new_resource_scheduler_->AddNodeAvailableResources( + self_node_id_.Binary(), // A + cpu_resource_ids.ToResourceSet().GetResourceMap()); + + worker->MarkBlocked(); + NewSchedulerSchedulePendingTasks(); return; } + if (!worker || worker->GetAssignedTaskId().IsNil() || worker->IsBlocked()) { return; // The worker may have died or is no longer processing the task. } @@ -2021,6 +2080,34 @@ void NodeManager::HandleDirectCallTaskBlocked(const std::shared_ptr &wor } void NodeManager::HandleDirectCallTaskUnblocked(const std::shared_ptr &worker) { + if (new_scheduler_enabled_) { + if (!worker) { + return; + } + auto it = leased_worker_resources_.find(worker->WorkerId()); + RAY_CHECK(it != leased_worker_resources_.end()); + const auto cpu_resources = it->second.GetNumCpus(); + bool oversubscribed = !local_available_resources_.Contains(cpu_resources); + if (!oversubscribed) { + // Reacquire the CPU resources for the worker. Note that care needs to be + // taken if the user is using the specific CPU IDs since the IDs that we + // reacquire here may be different from the ones that the task started with. + auto const resource_ids = local_available_resources_.Acquire(cpu_resources); + worker->AcquireTaskCpuResources(resource_ids); + cluster_resource_map_[self_node_id_].Acquire(cpu_resources); + new_resource_scheduler_->SubtractNodeAvailableResources( + self_node_id_.Binary(), cpu_resources.GetResourceMap()); + worker->borrowed_cpu_resources_ = ResourceSet(); + } else { + // Remember these are borrowed cpus resources, i.e., we did not return then to the + // worker. + worker->borrowed_cpu_resources_ = cpu_resources; + } + worker->MarkUnblocked(); + NewSchedulerSchedulePendingTasks(); + return; + } + if (!worker || worker->GetAssignedTaskId().IsNil() || !worker->IsBlocked()) { return; // The worker may have died or is no longer processing the task. } @@ -2135,10 +2222,6 @@ void NodeManager::AsyncResolveObjectsFinish( auto const resource_ids = local_available_resources_.Acquire(cpu_resources); worker->AcquireTaskCpuResources(resource_ids); cluster_resource_map_[self_node_id_].Acquire(cpu_resources); - if (new_scheduler_enabled_) { - new_resource_scheduler_->SubtractNodeAvailableResources( - self_node_id_.Binary(), cpu_resources.GetResourceMap()); - } } else { // In this case, we simply don't reacquire the CPU resources for the worker. // The worker can keep running and when the task finishes, it will simply @@ -2213,10 +2296,6 @@ void NodeManager::AssignTask(const std::shared_ptr &worker, const Task & auto acquired_resources = local_available_resources_.Acquire(spec.GetRequiredResources()); cluster_resource_map_[self_node_id_].Acquire(spec.GetRequiredResources()); - if (new_scheduler_enabled_) { - new_resource_scheduler_->AddNodeAvailableResources( - self_node_id_.Binary(), spec.GetRequiredResources().GetResourceMap()); - } if (spec.IsActorCreationTask()) { // Check that the actor's placement resource requirements are satisfied. @@ -2274,10 +2353,6 @@ bool NodeManager::FinishAssignedTask(Worker &worker) { local_available_resources_.ReleaseConstrained( task_resources, cluster_resource_map_[self_node_id_].GetTotalResources()); cluster_resource_map_[self_node_id_].Release(task_resources.ToResourceSet()); - if (new_scheduler_enabled_) { - new_resource_scheduler_->AddNodeAvailableResources( - self_node_id_.Binary(), task_resources.ToResourceSet().GetResourceMap()); - } worker.ResetTaskResourceIds(); const auto &spec = task.GetTaskSpecification(); diff --git a/src/ray/raylet/node_manager.h b/src/ray/raylet/node_manager.h index fa68abacd..c3fdd130c 100644 --- a/src/ray/raylet/node_manager.h +++ b/src/ray/raylet/node_manager.h @@ -645,8 +645,8 @@ class NodeManager : public rpc::NodeManagerServiceHandler { /// The new resource scheduler for direct task calls. std::shared_ptr new_resource_scheduler_; /// Map of leased workers to their current resource usage. - std::unordered_map> - leased_worker_resources_; + /// TODO(ion): Check whether we can track these resources in the worker. + std::unordered_map leased_worker_resources_; typedef std::function, ClientID spillback_to, std::string address, int port)> @@ -657,6 +657,9 @@ class NodeManager : public rpc::NodeManagerServiceHandler { std::deque> tasks_to_schedule_; /// Queue of lease requests that should be scheduled onto workers. std::deque> tasks_to_dispatch_; + + /// XXX + void WaitForTaskArgsRequests(std::pair &work); }; } // namespace raylet diff --git a/src/ray/raylet/worker.h b/src/ray/raylet/worker.h index de2caa981..fde77f4a2 100644 --- a/src/ray/raylet/worker.h +++ b/src/ray/raylet/worker.h @@ -68,6 +68,12 @@ class Worker { void DirectActorCallArgWaitComplete(int64_t tag); void WorkerLeaseGranted(const std::string &address, int port); + /// Cpus borrowed by the worker. This happens when the machine is oversubscribed + /// and the worker does not get back the cpu resources when unblocked. + /// TODO (ion): Add methods to access this variable. + /// TODO (ion): Investigate a more intuitive alternative to track these Cpus. + ResourceSet borrowed_cpu_resources_; + rpc::CoreWorkerClient *rpc_client() { return rpc_client_.get(); } private: