diff --git a/.travis.yml b/.travis.yml index a8ddb0d46..e2d3822a2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -20,50 +20,6 @@ before_install: matrix: include: - - os: linux - env: - - PYTHON=3.6 SMALL_AND_LARGE_TESTS=1 RAY_ENABLE_NEW_SCHEDULER=1 - - PYTHONWARNINGS=ignore - - RAY_DEFAULT_BUILD=1 - - RAY_CYTHON_EXAMPLES=1 - - RAY_USE_RANDOM_PORTS=1 - install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED - before_script: - - . ./ci/travis/ci.sh build - script: - # bazel python tests. This should be run last to keep its logs at the end of travis logs. - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,-medium_size_python_tests_a_to_j,-medium_size_python_tests_k_to_z,-new_scheduler_broken python/ray/tests/...; fi - - - os: linux - env: - - PYTHON=3.6 MEDIUM_TESTS_A_TO_J=1 RAY_ENABLE_NEW_SCHEDULER=1 - - PYTHONWARNINGS=ignore - - RAY_DEFAULT_BUILD=1 - - RAY_CYTHON_EXAMPLES=1 - - RAY_USE_RANDOM_PORTS=1 - install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED - before_script: - - . ./ci/travis/ci.sh build - script: - # bazel python tests for medium size tests. Used for parallelization. - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,medium_size_python_tests_a_to_j,-new_scheduler_broken python/ray/tests/...; fi - - - os: linux - env: - - PYTHON=3.6 MEDIUM_TESTS_K_TO_Z=1 RAY_ENABLE_NEW_SCHEDULER=1 - - PYTHONWARNINGS=ignore - - RAY_DEFAULT_BUILD=1 - - RAY_CYTHON_EXAMPLES=1 - - RAY_USE_RANDOM_PORTS=1 - install: - - . ./ci/travis/ci.sh init RAY_CI_SERVE_AFFECTED,RAY_CI_TUNE_AFFECTED,RAY_CI_PYTHON_AFFECTED,RAY_CI_DASHBOARD_AFFECTED - before_script: - - . ./ci/travis/ci.sh build - script: - # bazel python tests for medium size tests. Used for parallelization. - - if [ $RAY_CI_PYTHON_AFFECTED == "1" ]; then ./ci/keep_alive bazel test --config=ci $(./scripts/bazel_export_options) --test_tag_filters=-jenkins_only,medium_size_python_tests_k_to_z,-new_scheduler_broken python/ray/tests/...; fi - os: linux env: - PYTHON=3.6 SMALL_AND_LARGE_TESTS=1 diff --git a/dashboard/modules/logical_view/tests/test_logical_view_head.py b/dashboard/modules/logical_view/tests/test_logical_view_head.py index 2144918a4..5e4a8bb6c 100644 --- a/dashboard/modules/logical_view/tests/test_logical_view_head.py +++ b/dashboard/modules/logical_view/tests/test_logical_view_head.py @@ -35,7 +35,7 @@ def test_actor_groups(ray_start_with_dashboard): assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) - timeout_seconds = 5 + timeout_seconds = 10 start_time = time.time() last_ex = None while True: diff --git a/java/test/src/main/java/io/ray/test/DynamicResourceTest.java b/java/test/src/main/java/io/ray/test/DynamicResourceTest.java index a103d6943..eeaa55b2c 100644 --- a/java/test/src/main/java/io/ray/test/DynamicResourceTest.java +++ b/java/test/src/main/java/io/ray/test/DynamicResourceTest.java @@ -15,7 +15,8 @@ public class DynamicResourceTest extends BaseTest { return "hi"; } - @Test(groups = {"cluster"}) + // Dynamic resources not supported yet. + @Test(groups = {"cluster"}, enabled = false) public void testSetResource() { // Call a task in advance to warm up the cluster to avoid being too slow to start workers. TestUtils.warmUpCluster(); diff --git a/python/ray/test_utils.py b/python/ray/test_utils.py index 7f6aaa360..594431e2f 100644 --- a/python/ray/test_utils.py +++ b/python/ray/test_utils.py @@ -442,4 +442,4 @@ def format_web_url(url): def new_scheduler_enabled(): - return os.environ.get("RAY_ENABLE_NEW_SCHEDULER") == "1" + return os.environ.get("RAY_ENABLE_NEW_SCHEDULER", "1") == "1" diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 588710e3a..c5837a158 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -10,6 +10,7 @@ SRCS = [] + select({ py_test_module_list( files = [ +# "test_dynres.py", # dyn res not implemented "test_async.py", "test_actor.py", "test_actor_advanced.py", @@ -40,16 +41,6 @@ py_test_module_list( deps = ["//:ray_lib"], ) -py_test_module_list( - files = [ - "test_dynres.py", # dyn res not implemented - ], - size = "medium", - extra_srcs = SRCS, - tags = ["exclusive", "medium_size_python_tests_a_to_j", "new_scheduler_broken"], - deps = ["//:ray_lib"], -) - py_test_module_list( files = [ "test_memory_limits.py", diff --git a/src/ray/common/ray_config_def.h b/src/ray/common/ray_config_def.h index f5f420463..9f9392bf7 100644 --- a/src/ray/common/ray_config_def.h +++ b/src/ray/common/ray_config_def.h @@ -113,7 +113,7 @@ RAY_CONFIG(bool, lineage_pinning_enabled, false) /// only to work with direct calls. Once direct calls are becoming /// the default, this scheduler will also become the default. RAY_CONFIG(bool, new_scheduler_enabled, - getenv("RAY_ENABLE_NEW_SCHEDULER") != nullptr && + getenv("RAY_ENABLE_NEW_SCHEDULER") == nullptr || getenv("RAY_ENABLE_NEW_SCHEDULER") == std::string("1")) // The max allowed size in bytes of a return object from direct actor calls. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 9dd1f25b8..e78820d42 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -2951,6 +2951,9 @@ void NodeManager::HandlePinObjectIDs(const rpc::PinObjectIDsRequest &request, void NodeManager::HandleGetNodeStats(const rpc::GetNodeStatsRequest &node_stats_request, rpc::GetNodeStatsReply *reply, rpc::SendReplyCallback send_reply_callback) { + if (new_scheduler_enabled_) { + cluster_task_manager_->FillPendingActorInfo(reply); + } for (const auto &task : local_queues_.GetTasks(TaskState::INFEASIBLE)) { if (task.GetTaskSpecification().IsActorCreationTask()) { auto infeasible_task = reply->add_infeasible_tasks(); diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 09db70f1b..12715430e 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -8,6 +8,9 @@ namespace ray { namespace raylet { +// The max number of pending actors to report in node stats. +const int kMaxPendingActorsToReport = 20; + ClusterTaskManager::ClusterTaskManager( const NodeID &self_node_id, std::shared_ptr cluster_resource_scheduler, @@ -330,6 +333,39 @@ bool ClusterTaskManager::CancelTask(const TaskID &task_id) { return false; } +void ClusterTaskManager::FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const { + // Report infeasible actors. + int num_reported = 0; + for (const auto &shapes_it : infeasible_tasks_) { + auto &work_queue = shapes_it.second; + for (const auto &work_it : work_queue) { + Task task = std::get<0>(work_it); + if (task.GetTaskSpecification().IsActorCreationTask()) { + if (num_reported++ > kMaxPendingActorsToReport) { + break; // Protect the raylet from reporting too much data. + } + auto infeasible_task = reply->add_infeasible_tasks(); + infeasible_task->CopyFrom(task.GetTaskSpecification().GetMessage()); + } + } + } + // Report actors blocked on resources. + num_reported = 0; + for (const auto &shapes_it : boost::join(tasks_to_dispatch_, tasks_to_schedule_)) { + auto &work_queue = shapes_it.second; + for (const auto &work_it : work_queue) { + Task task = std::get<0>(work_it); + if (task.GetTaskSpecification().IsActorCreationTask()) { + if (num_reported++ > kMaxPendingActorsToReport) { + break; // Protect the raylet from reporting too much data. + } + auto ready_task = reply->add_infeasible_tasks(); + ready_task->CopyFrom(task.GetTaskSpecification().GetMessage()); + } + } + } +} + void ClusterTaskManager::FillResourceUsage( bool light_report_resource_usage_enabled, std::shared_ptr data) const { diff --git a/src/ray/raylet/scheduling/cluster_task_manager.h b/src/ray/raylet/scheduling/cluster_task_manager.h index aabc0a6fb..3e3ff2e44 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.h +++ b/src/ray/raylet/scheduling/cluster_task_manager.h @@ -105,6 +105,11 @@ class ClusterTaskManager { /// false if the task is already running. bool CancelTask(const TaskID &task_id); + /// Populate the list of pending or infeasible actor tasks for node stats. + /// + /// \param Output parameter. + void FillPendingActorInfo(rpc::GetNodeStatsReply *reply) const; + /// Populate the relevant parts of the heartbeat table. This is intended for /// sending raylet <-> gcs heartbeats. In particular, this should fill in /// resource_load and resource_load_by_shape.