[core] remove pushing RayTaskError to driver

Fix [Core] [Error Message] Possible unhandled error from worker #12018
This commit is contained in:
khu
2021-01-17 16:48:09 -08:00
parent 2cd51ce608
commit 416b45e78e
4 changed files with 6 additions and 47 deletions
-5
View File
@@ -531,11 +531,6 @@ cdef execute_task(
errors.append(failure_object)
core_worker.store_task_outputs(
worker, errors, c_return_ids, returns)
ray.utils.push_error_to_driver(
worker,
ray_constants.TASK_PUSH_ERROR,
str(failure_object),
job_id=worker.current_job_id)
if execution_info.max_calls != 0:
# Reset the state of the worker for the next task to execute.
-1
View File
@@ -111,7 +111,6 @@ def to_memory_units(memory_bytes, round_up):
WAIT_FOR_CLASS_PUSH_ERROR = "wait_for_class"
PICKLING_LARGE_OBJECT_PUSH_ERROR = "pickling_large_object"
WAIT_FOR_FUNCTION_PUSH_ERROR = "wait_for_function"
TASK_PUSH_ERROR = "task"
REGISTER_REMOTE_FUNCTION_PUSH_ERROR = "register_remote_function"
FUNCTION_TO_RUN_PUSH_ERROR = "function_to_run"
VERSION_MISMATCH_PUSH_ERROR = "version_mismatch"
+5 -36
View File
@@ -33,16 +33,6 @@ def test_failed_task(ray_start_regular, error_pubsub):
def throw_exception_fct3(x):
raise Exception("Test function 3 intentionally failed.")
p = error_pubsub
throw_exception_fct1.remote()
throw_exception_fct1.remote()
msgs = get_error_message(p, 2, ray_constants.TASK_PUSH_ERROR)
assert len(msgs) == 2
for msg in msgs:
assert "Test function 1 intentionally failed." in msg.error_message
x = throw_exception_fct2.remote()
try:
ray.get(x)
@@ -233,7 +223,7 @@ def temporary_helper_function():
foo = Foo.remote(3, arg2=0)
errors = get_error_message(p, 2)
assert len(errors) == 2
assert len(errors) == 1
for error in errors:
# Wait for the error to arrive.
@@ -249,13 +239,6 @@ def temporary_helper_function():
with pytest.raises(Exception, match="failed to be imported"):
ray.get(foo.get_val.remote(1, arg2=2))
# Wait for the error from when the call to get_val.
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert ("failed to be imported, and so cannot execute this method" in
errors[0].error_message)
f.close()
# Clean up the junk we added to sys.path.
@@ -276,19 +259,8 @@ def test_failed_actor_init(ray_start_regular, error_pubsub):
raise Exception(error_message2)
a = FailedActor.remote()
# Make sure that we get errors from a failed constructor.
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message1 in errors[0].error_message
# Make sure that we get errors from a failed method.
a.fail_method.remote()
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message1 in errors[0].error_message
with pytest.raises(Exception, match="actor constructor failed"):
ray.get(a.fail_method.remote())
def test_failed_actor_method(ray_start_regular, error_pubsub):
@@ -306,11 +278,8 @@ def test_failed_actor_method(ray_start_regular, error_pubsub):
a = FailedActor.remote()
# Make sure that we get errors from a failed method.
a.fail_method.remote()
errors = get_error_message(p, 1, ray_constants.TASK_PUSH_ERROR)
assert len(errors) == 1
assert errors[0].type == ray_constants.TASK_PUSH_ERROR
assert error_message2 in errors[0].error_message
with pytest.raises(Exception, match="actor method failed"):
ray.get(a.fail_method.remote())
def test_incorrect_method_calls(ray_start_regular):
+1 -5
View File
@@ -1100,11 +1100,7 @@ def listen_error_messages_raylet(worker, task_error_queue, threads_stopped):
continue
error_message = error_data.error_message
if (error_data.type == ray_constants.TASK_PUSH_ERROR):
# Delay it a bit to see if we can suppress it
task_error_queue.put((error_message, time.time()))
else:
logger.warning(error_message)
logger.warning(error_message)
except (OSError, redis.exceptions.ConnectionError) as e:
logger.error(f"listen_error_messages_raylet: {e}")
finally: