Push error to driver when monitor raises an exception. (#2834)

This commit is contained in:
Robert Nishihara
2018-09-07 17:42:45 -07:00
committed by Philipp Moritz
parent 753ba76141
commit bd64c940e9
7 changed files with 55 additions and 8 deletions
+1
View File
@@ -1320,6 +1320,7 @@ class GlobalState(object):
for i in range(gcs_entries.EntriesLength()):
error_data = ray.gcs_utils.ErrorTableData.GetRootAsErrorTableData(
gcs_entries.Entries(i), 0)
assert job_id.id() == error_data.JobId()
error_message = {
"type": decode(error_data.Type()),
"message": decode(error_data.ErrorMessage()),
+6 -1
View File
@@ -58,10 +58,12 @@ TablePrefix_ERROR_INFO_string = "ERROR_INFO"
TablePrefix_PROFILE_string = "PROFILE"
def construct_error_message(error_type, message, timestamp):
def construct_error_message(driver_id, error_type, message, timestamp):
"""Construct a serialized ErrorTableData object.
Args:
driver_id: The ID of the driver that the error should go to. If this is
nil, then the error will go to all drivers.
error_type: The type of the error.
message: The error message.
timestamp: The time of the error.
@@ -70,10 +72,13 @@ def construct_error_message(error_type, message, timestamp):
The serialized object.
"""
builder = flatbuffers.Builder(0)
driver_offset = builder.CreateString(driver_id)
error_type_offset = builder.CreateString(error_type)
message_offset = builder.CreateString(message)
ray.core.generated.ErrorTableData.ErrorTableDataStart(builder)
ray.core.generated.ErrorTableData.ErrorTableDataAddJobId(
builder, driver_offset)
ray.core.generated.ErrorTableData.ErrorTableDataAddType(
builder, error_type_offset)
ray.core.generated.ErrorTableData.ErrorTableDataAddErrorMessage(
+18 -3
View File
@@ -8,6 +8,7 @@ import logging
import os
import time
from collections import Counter, defaultdict
import traceback
import redis
@@ -316,8 +317,9 @@ class Monitor(object):
if ip:
self.load_metrics.update(ip, static_resources, dynamic_resources)
else:
print("Warning: could not find ip for client {} in {}.".format(
client_id, self.local_scheduler_id_to_ip_map))
logger.warning(
"Warning: could not find ip for client {} in {}.".format(
client_id, self.local_scheduler_id_to_ip_map))
def xray_heartbeat_handler(self, unused_channel, data):
"""Handle an xray heartbeat message from Redis."""
@@ -809,4 +811,17 @@ if __name__ == "__main__":
autoscaling_config = None
monitor = Monitor(redis_ip_address, redis_port, autoscaling_config)
monitor.run()
try:
monitor.run()
except Exception as e:
# Something went wrong, so push an error to all drivers.
redis_client = redis.StrictRedis(
host=redis_ip_address, port=redis_port)
traceback_str = ray.utils.format_error_message(traceback.format_exc())
message = "The monitor failed with the following error:\n{}".format(
traceback_str)
ray.utils.push_error_to_driver_through_redis(
redis_client, monitor.use_raylet, ray_constants.MONITOR_DIED_ERROR,
message)
raise e
+1
View File
@@ -44,6 +44,7 @@ PUT_RECONSTRUCTION_PUSH_ERROR = "put_reconstruction"
HASH_MISMATCH_PUSH_ERROR = "object_hash_mismatch"
INFEASIBLE_TASK_ERROR = "infeasible_task"
REMOVED_NODE_ERROR = "node_removed"
MONITOR_DIED_ERROR = "monitor_died"
# Abort autoscaling if more than this number of errors are encountered. This
# is a safety feature to prevent e.g. runaway node launches.
+1 -1
View File
@@ -122,7 +122,7 @@ def push_error_to_driver_through_redis(redis_client,
# Do everything in Python and through the Python Redis client instead
# of through the raylet.
error_data = ray.gcs_utils.construct_error_message(
error_type, message, time.time())
driver_id, error_type, message, time.time())
redis_client.execute_command(
"RAY.TABLE_APPEND", ray.gcs_utils.TablePrefix.ERROR_INFO,
ray.gcs_utils.TablePubsub.ERROR_INFO, driver_id, error_data)
-1
View File
@@ -1724,7 +1724,6 @@ def init(redis_address=None,
redirect_worker_output=False,
redirect_output=True,
ignore_reinit_error=False,
num_custom_resource=None,
num_redis_shards=None,
redis_max_clients=None,
redis_protected_mode=True,