From bd64c940e904fe312c5e68a431240687dd57b1fd Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Fri, 7 Sep 2018 17:42:45 -0700 Subject: [PATCH] Push error to driver when monitor raises an exception. (#2834) --- python/ray/experimental/state.py | 1 + python/ray/gcs_utils.py | 7 ++++++- python/ray/monitor.py | 21 ++++++++++++++++++--- python/ray/ray_constants.py | 1 + python/ray/utils.py | 2 +- python/ray/worker.py | 1 - test/failure_test.py | 30 ++++++++++++++++++++++++++++-- 7 files changed, 55 insertions(+), 8 deletions(-) diff --git a/python/ray/experimental/state.py b/python/ray/experimental/state.py index 687fbdae5..08eb29759 100644 --- a/python/ray/experimental/state.py +++ b/python/ray/experimental/state.py @@ -1320,6 +1320,7 @@ class GlobalState(object): for i in range(gcs_entries.EntriesLength()): error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData( gcs_entries.Entries(i), 0) + assert job_id.id() == error_data.JobId() error_message = { "type": decode(error_data.Type()), "message": decode(error_data.ErrorMessage()), diff --git a/python/ray/gcs_utils.py b/python/ray/gcs_utils.py index 53fa9d8d0..8db177254 100644 --- a/python/ray/gcs_utils.py +++ b/python/ray/gcs_utils.py @@ -58,10 +58,12 @@ TablePrefix_ERROR_INFO_string = "ERROR_INFO" TablePrefix_PROFILE_string = "PROFILE" -def construct_error_message(error_type, message, timestamp): +def construct_error_message(driver_id, error_type, message, timestamp): """Construct a serialized ErrorTableData object. Args: + driver_id: The ID of the driver that the error should go to. If this is + nil, then the error will go to all drivers. error_type: The type of the error. message: The error message. timestamp: The time of the error. @@ -70,10 +72,13 @@ def construct_error_message(error_type, message, timestamp): The serialized object. """ builder = flatbuffers.Builder(0) + driver_offset = builder.CreateString(driver_id) error_type_offset = builder.CreateString(error_type) message_offset = builder.CreateString(message) ray.core.generated.ErrorTableData.ErrorTableDataStart(builder) + ray.core.generated.ErrorTableData.ErrorTableDataAddJobId( + builder, driver_offset) ray.core.generated.ErrorTableData.ErrorTableDataAddType( builder, error_type_offset) ray.core.generated.ErrorTableData.ErrorTableDataAddErrorMessage( diff --git a/python/ray/monitor.py b/python/ray/monitor.py index bcaab4d2a..ccd2766bb 100644 --- a/python/ray/monitor.py +++ b/python/ray/monitor.py @@ -8,6 +8,7 @@ import logging import os import time from collections import Counter, defaultdict +import traceback import redis @@ -316,8 +317,9 @@ class Monitor(object): if ip: self.load_metrics.update(ip, static_resources, dynamic_resources) else: - print("Warning: could not find ip for client {} in {}.".format( - client_id, self.local_scheduler_id_to_ip_map)) + logger.warning( + "Warning: could not find ip for client {} in {}.".format( + client_id, self.local_scheduler_id_to_ip_map)) def xray_heartbeat_handler(self, unused_channel, data): """Handle an xray heartbeat message from Redis.""" @@ -809,4 +811,17 @@ if __name__ == "__main__": autoscaling_config = None monitor = Monitor(redis_ip_address, redis_port, autoscaling_config) - monitor.run() + + try: + monitor.run() + except Exception as e: + # Something went wrong, so push an error to all drivers. + redis_client = redis.StrictRedis( + host=redis_ip_address, port=redis_port) + traceback_str = ray.utils.format_error_message(traceback.format_exc()) + message = "The monitor failed with the following error:\n{}".format( + traceback_str) + ray.utils.push_error_to_driver_through_redis( + redis_client, monitor.use_raylet, ray_constants.MONITOR_DIED_ERROR, + message) + raise e diff --git a/python/ray/ray_constants.py b/python/ray/ray_constants.py index 9986cb41d..a9e4519d4 100644 --- a/python/ray/ray_constants.py +++ b/python/ray/ray_constants.py @@ -44,6 +44,7 @@ PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction" HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch" INFEASIBLE_TASK_ERROR = "infeasible_task" REMOVED_NODE_ERROR = "node_removed" +MONITOR_DIED_ERROR = "monitor_died" # Abort autoscaling if more than this number of errors are encountered. This # is a safety feature to prevent e.g. runaway node launches. diff --git a/python/ray/utils.py b/python/ray/utils.py index d5c348289..82a37b67e 100644 --- a/python/ray/utils.py +++ b/python/ray/utils.py @@ -122,7 +122,7 @@ def push_error_to_driver_through_redis(redis_client, # Do everything in Python and through the Python Redis client instead # of through the raylet. error_data = ray.gcs_utils.construct_error_message( - error_type, message, time.time()) + driver_id, error_type, message, time.time()) redis_client.execute_command( "RAY.TABLE_APPEND", ray.gcs_utils.TablePrefix.ERROR_INFO, ray.gcs_utils.TablePubsub.ERROR_INFO, driver_id, error_data) diff --git a/python/ray/worker.py b/python/ray/worker.py index 9d04aa137..2ac5d931f 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1724,7 +1724,6 @@ def init(redis_address=None, redirect_worker_output=False, redirect_output=True, ignore_reinit_error=False, - num_custom_resource=None, num_redis_shards=None, redis_max_clients=None, redis_protected_mode=True, diff --git a/test/failure_test.py b/test/failure_test.py index a8dde76b8..6370c4633 100644 --- a/test/failure_test.py +++ b/test/failure_test.py @@ -35,6 +35,13 @@ def ray_start_regular(): ray.shutdown() +@pytest.fixture +def shutdown_only(): + yield None + # The code after the yield will run as teardown code. + ray.shutdown() + + def test_failed_task(ray_start_regular): @ray.remote def throw_exception_fct1(): @@ -445,7 +452,7 @@ def test_put_error2(ray_start_object_store_memory): wait_for_errors(ray_constants.PUT_RECONSTRUCTION_PUSH_ERROR, 1) -def test_version_mismatch(): +def test_version_mismatch(shutdown_only): ray_version = ray.__version__ ray.__version__ = "fake ray version" @@ -456,7 +463,26 @@ def test_version_mismatch(): # Reset the version. ray.__version__ = ray_version - ray.shutdown() + +@pytest.mark.skipif( + os.environ.get("RAY_USE_XRAY") != "1", + reason="This test only works with xray.") +def test_warning_monitor_died(shutdown_only): + ray.init(num_cpus=0) + + time.sleep(1) # Make sure the monitor has started. + + # Cause the monitor to raise an exception by pushing a malformed message to + # Redis. This will probably kill the raylets and the raylet_monitor in + # addition to the monitor. + fake_id = 20 * b"\x00" + malformed_message = "asdf" + redis_client = ray.worker.global_state.redis_clients[0] + redis_client.execute_command( + "RAY.TABLE_ADD", ray.gcs_utils.TablePrefix.HEARTBEAT, + ray.gcs_utils.TablePubsub.HEARTBEAT, fake_id, malformed_message) + + wait_for_errors(ray_constants.MONITOR_DIED_ERROR, 1) def test_export_large_objects(ray_start_regular):