[New scheduler] Object spilling (#12857)

This commit is contained in:
Alex Wu
2020-12-15 11:05:38 -08:00
committed by GitHub
parent cde711aaf1
commit 0031723ace
2 changed files with 14 additions and 11 deletions
+1 -3
View File
@@ -12,7 +12,7 @@ import psutil
import ray
from ray.external_storage import (create_url_with_offset,
parse_url_with_offset)
from ray.test_utils import new_scheduler_enabled, wait_for_condition
from ray.test_utils import wait_for_condition
bucket_name = "object-spilling-test"
spill_local_path = "/tmp/spill"
@@ -338,7 +338,6 @@ def test_spill_objects_automatically(object_spilling_config, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.skipif(new_scheduler_enabled(), reason="hangs")
def test_spill_during_get(object_spilling_config, shutdown_only):
ray.init(
num_cpus=4,
@@ -569,7 +568,6 @@ def test_delete_objects_on_worker_failure(tmp_path, shutdown_only):
@pytest.mark.skipif(
platform.system() == "Windows", reason="Failing on Windows.")
@pytest.mark.skipif(new_scheduler_enabled(), reason="flaky")
def test_delete_objects_multi_node(tmp_path, ray_start_cluster):
# Limit our object store to 75 MiB of memory.
temp_folder = tmp_path / "spill"
+13 -8
View File
@@ -136,14 +136,19 @@ NodeManager::NodeManager(boost::asio::io_service &io_service, const NodeID &self
RayConfig::instance().light_report_resource_usage_enabled()),
initial_config_(config),
local_available_resources_(config.resource_config),
worker_pool_(
io_service, config.num_workers_soft_limit,
config.num_initial_python_workers_for_first_job,
config.maximum_startup_concurrency, config.min_worker_port,
config.max_worker_port, config.worker_ports, gcs_client_,
config.worker_commands, config.raylet_config,
/*starting_worker_timeout_callback=*/
[this]() { this->DispatchTasks(this->local_queues_.GetReadyTasksByClass()); }),
worker_pool_(io_service, config.num_workers_soft_limit,
config.num_initial_python_workers_for_first_job,
config.maximum_startup_concurrency, config.min_worker_port,
config.max_worker_port, config.worker_ports, gcs_client_,
config.worker_commands, config.raylet_config,
/*starting_worker_timeout_callback=*/
[this]() {
if (RayConfig::instance().new_scheduler_enabled()) {
ScheduleAndDispatch();
} else {
this->DispatchTasks(this->local_queues_.GetReadyTasksByClass());
}
}),
scheduling_policy_(local_queues_),
reconstruction_policy_(
io_service_,