diff --git a/python/ray/tests/test_failure.py b/python/ray/tests/test_failure.py index f9f655936..fd4dd371b 100644 --- a/python/ray/tests/test_failure.py +++ b/python/ray/tests/test_failure.py @@ -10,6 +10,7 @@ import pytest import redis import ray +import ray.utils import ray.ray_constants as ray_constants from ray.exceptions import RayTaskError from ray.cluster_utils import Cluster @@ -82,6 +83,20 @@ def test_failed_task(ray_start_regular, error_pubsub): assert False +def test_push_error_to_driver_through_redis(ray_start_regular, error_pubsub): + address_info = ray_start_regular + address = address_info["redis_address"] + redis_client = ray.services.create_redis_client( + address, password=ray.ray_constants.REDIS_DEFAULT_PASSWORD) + error_message = "Test error message" + ray.utils.push_error_to_driver_through_redis( + redis_client, ray_constants.DASHBOARD_AGENT_DIED_ERROR, error_message) + errors = get_error_message(error_pubsub, 1, + ray_constants.DASHBOARD_AGENT_DIED_ERROR) + assert errors[0].type == ray_constants.DASHBOARD_AGENT_DIED_ERROR + assert errors[0].error_message == error_message + + def test_get_throws_quickly_when_found_exception(ray_start_regular): # We use an actor instead of functions here. If we use functions, it's # very likely that two normal tasks are submitted before the first worker diff --git a/python/ray/utils.py b/python/ray/utils.py index e5d400ece..e199db116 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -126,7 +126,7 @@ def push_error_to_driver_through_redis(redis_client, pubsub_msg.id = job_id.binary() pubsub_msg.data = error_data redis_client.publish("ERROR_INFO:" + job_id.hex(), - pubsub_msg.SerializeAsString()) + pubsub_msg.SerializeToString()) def is_cython(obj):