From 0031723ace0bf54d505db2775b5b60e1e9eb4eea Mon Sep 17 00:00:00 2001 From: Alex Wu Date: Tue, 15 Dec 2020 11:05:38 -0800 Subject: [PATCH] [New scheduler] Object spilling (#12857) --- python/ray/tests/test_object_spilling.py | 4 +--- src/ray/raylet/node_manager.cc | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/python/ray/tests/test_object_spilling.py b/python/ray/tests/test_object_spilling.py index 5a71f5166..9004fd030 100644 --- a/python/ray/tests/test_object_spilling.py +++ b/python/ray/tests/test_object_spilling.py @@ -12,7 +12,7 @@ import psutil import ray from ray.external_storage import (create_url_with_offset, parse_url_with_offset) -from ray.test_utils import new_scheduler_enabled, wait_for_condition +from ray.test_utils import wait_for_condition bucket_name = "object-spilling-test" spill_local_path = "/tmp/spill" @@ -338,7 +338,6 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs") def test_spill_during_get(object_spilling_config, shutdown_only): ray.init( num_cpus=4, @@ -569,7 +568,6 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only): @pytest.mark.skipif( platform.system() == "Windows", reason="Failing on Windows.") -@pytest.mark.skipif(new_scheduler_enabled(), reason="flaky") def test_delete_objects_multi_node(tmp_path, ray_start_cluster): # Limit our object store to 75 MiB of memory. temp_folder = tmp_path / "spill" diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 3119bd8d4..9c452c5a2 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -136,14 +136,19 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self RayConfig::instance().light_report_resource_usage_enabled()), initial_config_(config), local_available_resources_(config.resource_config), - worker_pool_( - io_service, config.num_workers_soft_limit, - config.num_initial_python_workers_for_first_job, - config.maximum_startup_concurrency, config.min_worker_port, - config.max_worker_port, config.worker_ports, gcs_client_, - config.worker_commands, config.raylet_config, - /*starting_worker_timeout_callback=*/ - [this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }), + worker_pool_(io_service, config.num_workers_soft_limit, + config.num_initial_python_workers_for_first_job, + config.maximum_startup_concurrency, config.min_worker_port, + config.max_worker_port, config.worker_ports, gcs_client_, + config.worker_commands, config.raylet_config, + /*starting_worker_timeout_callback=*/ + [this]() { + if (RayConfig::instance().new_scheduler_enabled()) { + ScheduleAndDispatch(); + } else { + this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); + } + }), scheduling_policy_(local_queues_), reconstruction_policy_( io_service_,