From 067976ad3d228d309f92a517d412d088f5f27a57 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sat, 5 Jan 2019 13:27:32 -0800 Subject: [PATCH] Push a warning to all users when large number of workers have been started. (#3645) * Push a warning to all users when large number of workers have been started. * Add test. * Fix bug. * Give warning when worker starts instead of when worker registers. * Fix * Fix tests --- python/ray/ray_constants.py | 1 + src/ray/raylet/node_manager.cc | 7 +++ src/ray/raylet/worker_pool.cc | 24 +++++++++- src/ray/raylet/worker_pool.h | 13 +++++ test/failure_test.py | 88 ++++++++++++++++++++++++++-------- 5 files changed, 111 insertions(+), 22 deletions(-) diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index aaa767a3d..0ca708ec3 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -52,6 +52,7 @@ CHECKPOINT_PUSH_ERROR = "checkpoint" REGISTER_ACTOR_PUSH_ERROR = "register_actor" WORKER_CRASH_PUSH_ERROR = "worker_crash" WORKER_DIED_PUSH_ERROR = "worker_died" +WORKER_POOL_LARGE_ERROR = "worker_pool_large" PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction" INFEASIBLE_TASK_ERROR = "infeasible_task" REMOVED_NODE_ERROR = "node_removed" diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index b14479084..01bcedd99 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -1472,6 +1472,13 @@ bool NodeManager::AssignTask(const Task &task) { // There are no more non-actor workers available to execute this task. // Start a new worker. worker_pool_.StartWorkerProcess(spec.GetLanguage()); + // Push an error message to the user if the worker pool tells us that it is + // getting too big. + const std::string warning_message = worker_pool_.WarningAboutSize(); + if (warning_message != "") { + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + JobID::nil(), "worker_pool_large", warning_message, current_time_ms())); + } } // We couldn't assign this task, as no worker available. return false; diff --git a/src/ray/raylet/worker_pool.cc b/src/ray/raylet/worker_pool.cc index 6f5e25d0b..daf928f98 100644 --- a/src/ray/raylet/worker_pool.cc +++ b/src/ray/raylet/worker_pool.cc @@ -42,7 +42,9 @@ WorkerPool::WorkerPool( int maximum_startup_concurrency, const std::unordered_map> &worker_commands) : num_workers_per_process_(num_workers_per_process), - maximum_startup_concurrency_(maximum_startup_concurrency) { + multiple_for_warning_(std::max(num_worker_processes, maximum_startup_concurrency)), + maximum_startup_concurrency_(maximum_startup_concurrency), + last_warning_multiple_(0) { RAY_CHECK(num_workers_per_process > 0) << "num_workers_per_process must be positive."; RAY_CHECK(maximum_startup_concurrency > 0); // Ignore SIGCHLD signals. If we don't do this, then worker processes will @@ -250,6 +252,26 @@ std::vector> WorkerPool::GetWorkersRunningTasksForDriver return workers; } +std::string WorkerPool::WarningAboutSize() { + int64_t num_workers_started_or_registered = starting_worker_processes_.size(); + for (const auto &entry : states_by_lang_) { + num_workers_started_or_registered += + static_cast(entry.second.registered_workers.size()); + } + int64_t multiple = num_workers_started_or_registered / multiple_for_warning_; + std::stringstream warning_message; + if (multiple >= 3 && multiple > last_warning_multiple_) { + last_warning_multiple_ = multiple; + warning_message << "WARNING: " << num_workers_started_or_registered + << " workers have been started. This could be a result of using " + << "a large number of actors, or it could be a consequence of " + << "using nested tasks " + << "(see https://github.com/ray-project/ray/issues/3644) for " + << "some a discussion of workarounds."; + } + return warning_message.str(); +} + std::string WorkerPool::DebugString() const { std::stringstream result; result << "WorkerPool:"; diff --git a/src/ray/raylet/worker_pool.h b/src/ray/raylet/worker_pool.h index 32360b769..50d7c8998 100644 --- a/src/ray/raylet/worker_pool.h +++ b/src/ray/raylet/worker_pool.h @@ -123,6 +123,13 @@ class WorkerPool { /// \return string. std::string DebugString() const; + /// Generate a warning about the number of workers that have registered or + /// started if appropriate. + /// + /// \return An empty string if no warning should be generated and otherwise a + /// string with a warning message. + std::string WarningAboutSize(); + protected: /// A map from the pids of starting worker processes /// to the number of their unregistered workers. @@ -150,10 +157,16 @@ class WorkerPool { /// for a given language. inline State &GetStateForLanguage(const Language &language); + /// We'll push a warning to the user every time a multiple of this many + /// workers has been started. + int multiple_for_warning_; /// The maximum number of workers that can be started concurrently. int maximum_startup_concurrency_; /// Pool states per language. std::unordered_map states_by_lang_; + /// The last size at which a warning about the number of registered workers + /// was generated. + int64_t last_warning_multiple_; }; } // namespace raylet diff --git a/test/failure_test.py b/test/failure_test.py index e3b5223bc..d75874dfc 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -122,8 +122,10 @@ def temporary_helper_function(): return module.temporary_python_file() wait_for_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR, 2) - assert "No module named" in ray.error_info()[0]["message"] - assert "No module named" in ray.error_info()[1]["message"] + errors = relevant_errors(ray_constants.REGISTER_REMOTE_FUNCTION_PUSH_ERROR) + assert len(errors) == 2 + assert "No module named" in errors[0]["message"] + assert "No module named" in errors[1]["message"] # Check that if we try to call the function it throws an exception and # does not hang. @@ -145,10 +147,10 @@ def test_failed_function_to_run(ray_start_regular): ray.worker.global_worker.run_function_on_all_workers(f) wait_for_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR, 2) # Check that the error message is in the task info. - error_info = ray.error_info() - assert len(error_info) == 2 - assert "Function to run failed." in error_info[0]["message"] - assert "Function to run failed." in error_info[1]["message"] + errors = relevant_errors(ray_constants.FUNCTION_TO_RUN_PUSH_ERROR) + assert len(errors) == 2 + assert "Function to run failed." in errors[0]["message"] + assert "Function to run failed." in errors[1]["message"] def test_fail_importing_actor(ray_start_regular): @@ -185,12 +187,14 @@ def temporary_helper_function(): # Wait for the error to arrive. wait_for_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR, 1) - assert "No module named" in ray.error_info()[0]["message"] + errors = relevant_errors(ray_constants.REGISTER_ACTOR_PUSH_ERROR) + assert "No module named" in errors[0]["message"] # Wait for the error from when the __init__ tries to run. wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) + errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) assert ("failed to be imported, and so cannot execute this method" in - ray.error_info()[1]["message"]) + errors[0]["message"]) # Check that if we try to get the function it throws an exception and # does not hang. @@ -199,8 +203,9 @@ def temporary_helper_function(): # Wait for the error from when the call to get_val. wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) + errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) assert ("failed to be imported, and so cannot execute this method" in - ray.error_info()[2]["message"]) + errors[1]["message"]) f.close() @@ -224,14 +229,16 @@ def test_failed_actor_init(ray_start_regular): # Make sure that we get errors from a failed constructor. wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - assert len(ray.error_info()) == 1 - assert error_message1 in ray.error_info()[0]["message"] + errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 1 + assert error_message1 in errors[0]["message"] # Make sure that we get errors from a failed method. a.fail_method.remote() wait_for_errors(ray_constants.TASK_PUSH_ERROR, 2) - assert len(ray.error_info()) == 2 - assert error_message1 in ray.error_info()[1]["message"] + errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 2 + assert error_message1 in errors[1]["message"] def test_failed_actor_method(ray_start_regular): @@ -250,8 +257,9 @@ def test_failed_actor_method(ray_start_regular): # Make sure that we get errors from a failed method. a.fail_method.remote() wait_for_errors(ray_constants.TASK_PUSH_ERROR, 1) - assert len(ray.error_info()) == 1 - assert error_message2 in ray.error_info()[0]["message"] + errors = relevant_errors(ray_constants.TASK_PUSH_ERROR) + assert len(errors) == 1 + assert error_message2 in errors[0]["message"] def test_incorrect_method_calls(ray_start_regular): @@ -301,7 +309,6 @@ def test_worker_raising_exception(ray_start_regular): wait_for_errors(ray_constants.WORKER_CRASH_PUSH_ERROR, 1) wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) - assert len(ray.error_info()) == 2 def test_worker_dying(ray_start_regular): @@ -314,9 +321,9 @@ def test_worker_dying(ray_start_regular): wait_for_errors(ray_constants.WORKER_DIED_PUSH_ERROR, 1) - error_info = ray.error_info() - assert len(error_info) == 1 - assert "died or was killed while executing" in error_info[0]["message"] + errors = relevant_errors(ray_constants.WORKER_DIED_PUSH_ERROR) + assert len(errors) == 1 + assert "died or was killed while executing" in errors[0]["message"] def test_actor_worker_dying(ray_start_regular): @@ -571,6 +578,45 @@ def test_warning_for_infeasible_zero_cpu_actor(shutdown_only): wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1) +def test_warning_for_too_many_actors(shutdown_only): + # Check that if we run a workload which requires too many workers to be + # started that we will receive a warning. + num_cpus = 2 + ray.init(num_cpus=num_cpus) + + @ray.remote + class Foo(object): + def __init__(self): + time.sleep(1000) + + [Foo.remote() for _ in range(num_cpus * 3)] + wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) + [Foo.remote() for _ in range(num_cpus)] + wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 2) + + +def test_warning_for_too_many_nested_tasks(shutdown_only): + # Check that if we run a workload which requires too many workers to be + # started that we will receive a warning. + num_cpus = 2 + ray.init(num_cpus=num_cpus) + + @ray.remote + def f(): + time.sleep(1000) + return 1 + + @ray.remote + def g(): + # Sleep so that the f tasks all get submitted to the scheduler after + # the g tasks. + time.sleep(1) + ray.get(f.remote()) + + [g.remote() for _ in range(num_cpus * 4)] + wait_for_errors(ray_constants.WORKER_POOL_LARGE_ERROR, 1) + + @pytest.fixture def ray_start_two_nodes(): # Start the Ray processes. @@ -610,8 +656,8 @@ def test_warning_for_dead_node(ray_start_two_nodes): # Extract the client IDs from the error messages. This will need to be # changed if the error message changes. warning_client_ids = { - item['message'].split(' ')[5] - for item in ray.error_info() + item["message"].split(" ")[5] + for item in relevant_errors(ray_constants.REMOVED_NODE_ERROR) } assert client_ids == warning_client_ids