From 416b45e78e5ec03d21e32b4fc9aa6953b7cdf70b Mon Sep 17 00:00:00 2001 From: khu Date: Sun, 17 Jan 2021 16:48:09 -0800 Subject: [PATCH] [core] remove pushing RayTaskError to driver Fix [Core] [Error Message] Possible unhandled error from worker #12018 --- python/ray/_raylet.pyx | 5 ---- python/ray/ray_constants.py | 1 - python/ray/tests/test_failure.py | 41 ++++---------------------------- python/ray/worker.py | 6 +---- 4 files changed, 6 insertions(+), 47 deletions(-) diff --git a/python/ray/_raylet.pyx b/python/ray/_raylet.pyx index 8ba80852f..de6258826 100644 --- a/python/ray/_raylet.pyx +++ b/python/ray/_raylet.pyx @@ -531,11 +531,6 @@ cdef execute_task( errors.append(failure_object) core_worker.store_task_outputs( worker, errors, c_return_ids, returns) - ray.utils.push_error_to_driver( - worker, - ray_constants.TASK_PUSH_ERROR, - str(failure_object), - job_id=worker.current_job_id) if execution_info.max_calls != 0: # Reset the state of the worker for the next task to execute. diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index a5459b863..8b2c0980e 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -111,7 +111,6 @@ def to_memory_units(memory_bytes, round_up): WAIT_FOR_CLASS_PUSH_ERROR = "wait_for_class" PICKLING_LARGE_OBJECT_PUSH_ERROR = "pickling_large_object" WAIT_FOR_FUNCTION_PUSH_ERROR = "wait_for_function" -TASK_PUSH_ERROR = "task" REGISTER_REMOTE_FUNCTION_PUSH_ERROR = "register_remote_function" FUNCTION_TO_RUN_PUSH_ERROR = "function_to_run" VERSION_MISMATCH_PUSH_ERROR = "version_mismatch" diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index f01868989..0edd05766 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -33,16 +33,6 @@ def test_failed_task(ray_start_regular, error_pubsub): def throw_exception_fct3(x): raise Exception("Test function 3 intentionally failed.") - p = error_pubsub - - throw_exception_fct1.remote() - throw_exception_fct1.remote() - - msgs = get_error_message(p, 2, ray_constants.TASK_PUSH_ERROR) - assert len(msgs) == 2 - for msg in msgs: - assert "Test function 1 intentionally failed." in msg.error_message - x = throw_exception_fct2.remote() try: ray.get(x) @@ -233,7 +223,7 @@ def temporary_helper_function(): foo = Foo.remote(3, arg2=0) errors = get_error_message(p, 2) - assert len(errors) == 2 + assert len(errors) == 1 for error in errors: # Wait for the error to arrive. @@ -249,13 +239,6 @@ def temporary_helper_function(): with pytest.raises(Exception, match="failed to be imported"): ray.get(foo.get_val.remote(1, arg2=2)) - # Wait for the error from when the call to get_val. - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.TASK_PUSH_ERROR - assert ("failed to be imported, and so cannot execute this method" in - errors[0].error_message) - f.close() # Clean up the junk we added to sys.path. @@ -276,19 +259,8 @@ def test_failed_actor_init(ray_start_regular, error_pubsub): raise Exception(error_message2) a = FailedActor.remote() - - # Make sure that we get errors from a failed constructor. - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.TASK_PUSH_ERROR - assert error_message1 in errors[0].error_message - - # Make sure that we get errors from a failed method. - a.fail_method.remote() - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.TASK_PUSH_ERROR - assert error_message1 in errors[0].error_message + with pytest.raises(Exception, match="actor constructor failed"): + ray.get(a.fail_method.remote()) def test_failed_actor_method(ray_start_regular, error_pubsub): @@ -306,11 +278,8 @@ def test_failed_actor_method(ray_start_regular, error_pubsub): a = FailedActor.remote() # Make sure that we get errors from a failed method. - a.fail_method.remote() - errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR) - assert len(errors) == 1 - assert errors[0].type == ray_constants.TASK_PUSH_ERROR - assert error_message2 in errors[0].error_message + with pytest.raises(Exception, match="actor method failed"): + ray.get(a.fail_method.remote()) def test_incorrect_method_calls(ray_start_regular): diff --git a/python/ray/worker.py b/python/ray/worker.py index 350bbc649..ae997f3cd 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1100,11 +1100,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped): continue error_message = error_data.error_message - if (error_data.type == ray_constants.TASK_PUSH_ERROR): - # Delay it a bit to see if we can suppress it - task_error_queue.put((error_message, time.time())) - else: - logger.warning(error_message) + logger.warning(error_message) except (OSError, redis.exceptions.ConnectionError) as e: logger.error(f"listen_error_messages_raylet: {e}") finally: