Enable by default new scheduler (#12735)

This commit is contained in:
Eric Liang
2020-12-19 13:22:24 -08:00
committed by GitHub
parent 5d3c9c8861
commit 64c97d25d3
9 changed files with 50 additions and 58 deletions
-44
View File
@@ -20,50 +20,6 @@ before_install:
matrix:
include:
- os: linux
env:
- PYTHON=3.6 SMALL_AND_LARGE_TESTS=1 RAY_ENABLE_NEW_SCHEDULER=1
- PYTHONWARNINGS=ignore
- RAY_DEFAULT_BUILD=1
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
install:
- . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED
before_script:
- . ./ci/travis/ci.sh build
script:
# bazel python tests. This should be run last to keep its logs at the end of travis logs.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,-medium_size_python_tests_a_to_j,-medium_size_python_tests_k_to_z,-new_scheduler_broken python/ray/tests/...; fi
- os: linux
env:
- PYTHON=3.6 MEDIUM_TESTS_A_TO_J=1 RAY_ENABLE_NEW_SCHEDULER=1
- PYTHONWARNINGS=ignore
- RAY_DEFAULT_BUILD=1
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
install:
- . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED
before_script:
- . ./ci/travis/ci.sh build
script:
# bazel python tests for medium size tests. Used for parallelization.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,medium_size_python_tests_a_to_j,-new_scheduler_broken python/ray/tests/...; fi
- os: linux
env:
- PYTHON=3.6 MEDIUM_TESTS_K_TO_Z=1 RAY_ENABLE_NEW_SCHEDULER=1
- PYTHONWARNINGS=ignore
- RAY_DEFAULT_BUILD=1
- RAY_CYTHON_EXAMPLES=1
- RAY_USE_RANDOM_PORTS=1
install:
- . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED
before_script:
- . ./ci/travis/ci.sh build
script:
# bazel python tests for medium size tests. Used for parallelization.
- if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,medium_size_python_tests_k_to_z,-new_scheduler_broken python/ray/tests/...; fi
- os: linux
env:
- PYTHON=3.6 SMALL_AND_LARGE_TESTS=1
@@ -35,7 +35,7 @@ def test_actor_groups(ray_start_with_dashboard):
assert wait_until_server_available(webui_url)
webui_url = format_web_url(webui_url)
timeout_seconds = 5
timeout_seconds = 10
start_time = time.time()
last_ex = None
while True:
@@ -15,7 +15,8 @@ public class DynamicResourceTest extends BaseTest {
return "hi";
}
@Test(groups = {"cluster"})
// Dynamic resources not supported yet.
@Test(groups = {"cluster"}, enabled = false)
public void testSetResource() {
// Call a task in advance to warm up the cluster to avoid being too slow to start workers.
TestUtils.warmUpCluster();
+1 -1
View File
@@ -442,4 +442,4 @@ def format_web_url(url):
def new_scheduler_enabled():
return os.environ.get("RAY_ENABLE_NEW_SCHEDULER") == "1"
return os.environ.get("RAY_ENABLE_NEW_SCHEDULER", "1") == "1"
+1 -10
View File
@@ -10,6 +10,7 @@ SRCS = [] + select({
py_test_module_list(
files = [
# "test_dynres.py", # dyn res not implemented
"test_async.py",
"test_actor.py",
"test_actor_advanced.py",
@@ -40,16 +41,6 @@ py_test_module_list(
deps = ["//:ray_lib"],
)
py_test_module_list(
files = [
"test_dynres.py", # dyn res not implemented
],
size = "medium",
extra_srcs = SRCS,
tags = ["exclusive", "medium_size_python_tests_a_to_j", "new_scheduler_broken"],
deps = ["//:ray_lib"],
)
py_test_module_list(
files = [
"test_memory_limits.py",
+1 -1
View File
@@ -113,7 +113,7 @@ RAY_CONFIG(bool, lineage_pinning_enabled, false)
/// only to work with direct calls. Once direct calls are becoming
/// the default, this scheduler will also become the default.
RAY_CONFIG(bool, new_scheduler_enabled,
getenv("RAY_ENABLE_NEW_SCHEDULER") != nullptr &&
getenv("RAY_ENABLE_NEW_SCHEDULER") == nullptr ||
getenv("RAY_ENABLE_NEW_SCHEDULER") == std::string("1"))
// The max allowed size in bytes of a return object from direct actor calls.
+3
View File
@@ -2951,6 +2951,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request,
void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request,
rpc::GetNodeStatsReply *reply,
rpc::SendReplyCallback send_reply_callback) {
if (new_scheduler_enabled_) {
cluster_task_manager_->FillPendingActorInfo(reply);
}
for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) {
if (task.GetTaskSpecification().IsActorCreationTask()) {
auto infeasible_task = reply->add_infeasible_tasks();
@@ -8,6 +8,9 @@
namespace ray {
namespace raylet {
// The max number of pending actors to report in node stats.
const int kMaxPendingActorsToReport = 20;
ClusterTaskManager::ClusterTaskManager(
const NodeID &self_node_id,
std::shared_ptr<ClusterResourceScheduler> cluster_resource_scheduler,
@@ -330,6 +333,39 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) {
return false;
}
void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const {
// Report infeasible actors.
int num_reported = 0;
for (const auto &shapes_it : infeasible_tasks_) {
auto &work_queue = shapes_it.second;
for (const auto &work_it : work_queue) {
Task task = std::get<0>(work_it);
if (task.GetTaskSpecification().IsActorCreationTask()) {
if (num_reported++ > kMaxPendingActorsToReport) {
break; // Protect the raylet from reporting too much data.
}
auto infeasible_task = reply->add_infeasible_tasks();
infeasible_task->CopyFrom(task.GetTaskSpecification().GetMessage());
}
}
}
// Report actors blocked on resources.
num_reported = 0;
for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) {
auto &work_queue = shapes_it.second;
for (const auto &work_it : work_queue) {
Task task = std::get<0>(work_it);
if (task.GetTaskSpecification().IsActorCreationTask()) {
if (num_reported++ > kMaxPendingActorsToReport) {
break; // Protect the raylet from reporting too much data.
}
auto ready_task = reply->add_infeasible_tasks();
ready_task->CopyFrom(task.GetTaskSpecification().GetMessage());
}
}
}
}
void ClusterTaskManager::FillResourceUsage(
bool light_report_resource_usage_enabled,
std::shared_ptr<rpc::ResourcesData> data) const {
@@ -105,6 +105,11 @@ class ClusterTaskManager {
/// false if the task is already running.
bool CancelTask(const TaskID &task_id);
/// Populate the list of pending or infeasible actor tasks for node stats.
///
/// \param Output parameter.
void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const;
/// Populate the relevant parts of the heartbeat table. This is intended for
/// sending raylet <-> gcs heartbeats. In particular, this should fill in
/// resource_load and resource_load_by_shape.