diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 384afddd8..a835801bd 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -42,6 +42,7 @@ WORKER_CRASH_PUSH_ERROR = "worker_crash" WORKER_DIED_PUSH_ERROR = "worker_died" PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction" HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch" +INFEASIBLE_TASK_ERROR = "infeasible_task" # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. diff --git a/src/ray/raylet/node_manager.cc b/src/ray/raylet/node_manager.cc index 7803a031c..d850cbc09 100644 --- a/src/ray/raylet/node_manager.cc +++ b/src/ray/raylet/node_manager.cc @@ -842,6 +842,19 @@ void NodeManager::ScheduleTasks( for (const auto &task : local_queues_.GetPlaceableTasks()) { task_dependency_manager_.TaskPending(task); move_task_set.insert(task.GetTaskSpecification().TaskId()); + // Push a warning to the task's driver that this task is currently infeasible. + { + // TODO(rkn): Define this constant somewhere else. + std::string type = "infeasible_task"; + std::ostringstream error_message; + error_message << "The task with ID " << task.GetTaskSpecification().TaskId() + << " is infeasible and cannot currently be executed. " + << "It requested " + << task.GetTaskSpecification().GetRequiredResources().ToString(); + RAY_CHECK_OK(gcs_client_->error_table().PushErrorToDriver( + task.GetTaskSpecification().DriverId(), type, error_message.str(), + current_time_ms())); + } // Assert that this placeable task is not feasible locally (necessary but not // sufficient). RAY_CHECK(!task.GetTaskSpecification().GetRequiredResources().IsSubset( diff --git a/test/failure_test.py b/test/failure_test.py index e438771d6..89149ea4e 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -480,3 +480,26 @@ def test_export_large_objects(ray_start_regular): # Make sure that a warning is generated. wait_for_errors(ray_constants.PICKLING_LARGE_OBJECT_PUSH_ERROR, 2) + + +@pytest.mark.skipif( + os.environ.get("RAY_USE_XRAY") != "1", + reason="This test only works with xray.") +def test_warning_for_infeasible_tasks(ray_start_regular): + # Check that we get warning messages for infeasible tasks. + + @ray.remote(num_gpus=1) + def f(): + pass + + @ray.remote(resources={"Custom": 1}) + class Foo(object): + pass + + # This task is infeasible. + f.remote() + wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 1) + + # This actor placement task is infeasible. + Foo.remote() + wait_for_errors(ray_constants.INFEASIBLE_TASK_ERROR, 2)