From 1fec94ef00e03c1228c4ad19b3d227fc366a723a Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Tue, 7 Feb 2017 14:21:25 -0800 Subject: [PATCH] Display drivers in web UI. (#252) * Display drivers in web UI. * Display more rows in grid and factor out function in webui backend. --- python/ray/services.py | 2 + python/ray/worker.py | 29 +++++++++++- webui/backend/ray_ui.py | 88 ++++++++++++++++++++++++++++++++++++- webui/src/ray-overview.html | 73 ++++++++++++++++++++++++++++++ 4 files changed, 189 insertions(+), 3 deletions(-) diff --git a/python/ray/services.py b/python/ray/services.py index 71aa106fe..1789f6a5f 100644 --- a/python/ray/services.py +++ b/python/ray/services.py @@ -243,6 +243,8 @@ def start_redis(port=None, num_retries=20, cleanup=True, redirect_output=False): # Configure Redis to not run in protected mode so that processes on other # hosts can connect to it. TODO(rkn): Do this in a more secure way. redis_client.config_set("protected-mode", "no") + # Put a time stamp in Redis to indicate when it was started. + redis_client.set("redis_start_time", time.time()) return port def start_global_scheduler(redis_address, cleanup=True, redirect_output=False): diff --git a/python/ray/worker.py b/python/ray/worker.py index aa19bd9a2..495f46ea8 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -883,6 +883,11 @@ def cleanup(worker=global_worker): services.cleanup() in the tests because we need to start and stop many clusters in the tests, but the import and exit only happen once. """ + # If this is a driver, push the finish time to Redis. + if worker.mode in [SCRIPT_MODE, SILENT_MODE]: + worker.redis_client.hmset(b"Drivers:" + worker.worker_id, + {"end_time": time.time()}) + disconnect(worker) worker.set_mode(None) worker.driver_export_counter = 0 @@ -893,6 +898,19 @@ def cleanup(worker=global_worker): atexit.register(cleanup) +# Define a custom excepthook so that if the driver exits with an exception, we +# can push that exception to Redis. +normal_excepthook = sys.excepthook +def custom_excepthook(type, value, tb): + # If this is a driver, push the exception to redis. + if global_worker.mode in [SCRIPT_MODE, SILENT_MODE]: + error_message = "".join(traceback.format_tb(tb)) + global_worker.redis_client.hmset(b"Drivers:" + global_worker.worker_id, + {"exception": error_message}) + # Call the normal excepthook. + normal_excepthook(type, value, traceback) +sys.excepthook = custom_excepthook + def print_error_messages(worker): """Print error messages in the background on the driver. @@ -1105,10 +1123,17 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker): worker.plasma_client = plasma.PlasmaClient(info["store_socket_name"], info["manager_socket_name"]) # Create the local scheduler client. worker.photon_client = photon.PhotonClient(info["local_scheduler_socket_name"]) - # Register the worker with Redis. if mode in [SCRIPT_MODE, SILENT_MODE]: - worker.redis_client.hmset(b"Drivers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address}) + # The concept of a driver is the same as the concept of a "job". Register + # the driver/job with Redis here. + import __main__ as main + driver_info = {"node_ip_address": worker.node_ip_address, + "driver_id": worker.worker_id, + "start_time": time.time()} + driver_info["name"] = main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE" + worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info) elif mode == WORKER_MODE: + # Register the worker with Redis. worker.redis_client.hmset(b"Workers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address}) else: raise Exception("This code should be unreachable.") diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index d0dbb5d96..6c28f7348 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -3,9 +3,11 @@ import argparse import asyncio import binascii from collections import defaultdict +import datetime import json import numpy as np import redis +import time import websockets parser = argparse.ArgumentParser(description="parse information for the web ui") @@ -25,6 +27,10 @@ def identifier(hex_identifier): def key_to_hex_identifier(key): return hex_identifier(key[(key.index(b":") + 1):(key.index(b":") + IDENTIFIER_LENGTH + 1)]) +def timestamp_to_date_string(timestamp): + """Convert a time stamp returned by time.time() to a formatted string.""" + return datetime.datetime.fromtimestamp(timestamp).strftime("%Y/%m/%d %H:%M:%S") + def key_to_hex_identifiers(key): # Extract worker_id and task_id from key of the form prefix:worker_id:task_id. offset = key.index(b":") + 1 @@ -35,6 +41,80 @@ def key_to_hex_identifiers(key): worker_ids = [] +def duration_to_string(duration): + """Format a duration in seconds as a string. + + Args: + duration (float): The duration in seconds. + + Return: + A more human-readable version of the string (for example, "3.5 hours" or + "93 milliseconds"). + """ + if duration > 3600 * 24: + duration_str = "{0:0.1f} days".format(duration / (3600 * 24)) + elif duration > 3600: + duration_str = "{0:0.1f} hours".format(duration / 3600) + elif duration > 60: + duration_str = "{0:0.1f} minutes".format(duration / 60) + elif duration > 1: + duration_str = "{0:0.1f} seconds".format(duration) + else: + duration_str = "{} milliseconds".format(int(duration * 1000)) + return duration_str + +async def handle_get_statistics(websocket, redis_conn): + cluster_start_time = float(await redis_conn.execute("get", "redis_start_time")) + start_date = timestamp_to_date_string(cluster_start_time) + + uptime = duration_to_string(time.time() - cluster_start_time) + + client_keys = await redis_conn.execute("keys", "CL:*") + clients = [] + for client_key in client_keys: + client_fields = await redis_conn.execute("hgetall", client_key) + client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)} + clients.append(client_fields) + ip_addresses = list(set([client[b"node_ip_address"].decode("ascii") for client in clients if client[b"client_type"] == b"photon"])) + num_nodes = len(ip_addresses) + reply = {"uptime": uptime, + "start_date": start_date, + "nodes": num_nodes, + "addresses": ip_addresses} + await websocket.send(json.dumps(reply)) + +async def handle_get_drivers(websocket, redis_conn): + keys = await redis_conn.execute("keys", "Drivers:*") + drivers = [] + for key in keys: + driver_fields = await redis_conn.execute("hgetall", key) + driver_fields = {driver_fields[2 * i]: driver_fields[2 * i + 1] for i in range(len(driver_fields) // 2)} + driver_info = {"node ip address": driver_fields[b"node_ip_address"].decode("ascii"), + "name": driver_fields[b"name"].decode("ascii")} + + driver_info["start time"] = timestamp_to_date_string(float(driver_fields[b"start_time"])) + + if b"end_time" in driver_fields: + duration = float(driver_fields[b"end_time"]) - float(driver_fields[b"start_time"]) + else: + duration = time.time() - float(driver_fields[b"start_time"]) + driver_info["duration"] = duration_to_string(duration) + + if b"exception" in driver_fields: + driver_info["status"] = "FAILED" + elif b"end_time" not in driver_fields: + driver_info["status"] = "IN PROGRESS" + else: + driver_info["status"] = "SUCCESS" + + if b"exception" in driver_fields: + driver_info["exception"] = driver_fields[b"exception"].decode("ascii") + + drivers.append(driver_info) + # Sort the drivers by their start times. + reply = sorted(drivers, key=(lambda driver: driver["start time"]))[::-1] + await websocket.send(json.dumps(reply)) + async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): keys = await redis_conn.execute("keys", "event_log:*") if len(keys) == 0: @@ -85,11 +165,17 @@ async def serve_requests(websocket, path): redis_conn = await aioredis.create_connection((redis_ip_address, redis_port), loop=loop) # We loop infinitely because otherwise the websocket will be closed. + # TODO(rkn): Maybe we should open a new web sockets for every request instead + # of looping here. while True: command = json.loads(await websocket.recv()) print("received command {}".format(command)) - if command["command"] == "get-recent-tasks": + if command["command"] == "get-statistics": + await handle_get_statistics(websocket, redis_conn) + elif command["command"] == "get-drivers": + await handle_get_drivers(websocket, redis_conn) + elif command["command"] == "get-recent-tasks": await handle_get_recent_tasks(websocket, redis_conn, command["num"]) if command["command"] == "get-workers": diff --git a/webui/src/ray-overview.html b/webui/src/ray-overview.html index 7939981a7..4362afb3a 100644 --- a/webui/src/ray-overview.html +++ b/webui/src/ray-overview.html @@ -23,6 +23,24 @@ subject to an additional IP rights grant found at http://polymer.github.io/PATEN

Overview

+

Cluster Statistics

+
+
+
+ +

Drivers

+ + + + + + + + + +
+
+

Clients

@@ -33,6 +51,7 @@ subject to an additional IP rights grant found at http://polymer.github.io/PATEN
+

Workers

@@ -50,6 +69,60 @@ subject to an additional IP rights grant found at http://polymer.github.io/PATEN Polymer({ is: 'ray-overview', ready: function() { + var statisticsSocket = new WebSocket(backend_address); + var uptime = Polymer.dom(this.root).querySelector("#uptime"); + var nodes = Polymer.dom(this.root).querySelector("#nodes"); + var addresses = Polymer.dom(this.root).querySelector("#addresses"); + statisticsSocket.onopen = function() { + statisticsSocket.send(JSON.stringify({"command": "get-statistics"})) + } + statisticsSocket.onmessage = function(answer) { + var statistics = JSON.parse(answer.data); + uptime.innerHTML = "Total Uptime: " + statistics.uptime + " since " + statistics.start_date; + nodes.innerHTML = "Number of Machines: " + statistics.nodes; + addresses.innerHTML = "IP addresses: " + statistics.addresses; + } + + var driverSocket = new WebSocket(backend_address); + var drivers = Polymer.dom(this.root).querySelector("#drivers"); + driverSocket.onopen = function() { + driverSocket.send(JSON.stringify({"command": "get-drivers"})); + } + driverSocket.onmessage = function(answer) { + drivers.items = JSON.parse(answer.data); + drivers.visibleRows = drivers.items.length; + + // Add a row details generator + drivers.rowDetailsGenerator = function(rowIndex) { + var elem = document.createElement("pre"); + elem.setAttribute("class", "userdetailswrapper"); + elem.style.whiteSpace = "pre-wrap"; + + drivers.getItem(rowIndex, function(error, item) { + if (!error) { + if (item.exception) { + elem.innerHTML = item.exception + "\n\n"; + } + } + }); + + return elem; + }; + + var detailsOpenIndex = -1; + + // Show details for the selected row + drivers.addEventListener("selected-items-changed", function() { + drivers.setRowDetailsVisible(detailsOpenIndex, false); + var selected = drivers.selection.selected(); + if (selected.length == 1) { + drivers.setRowDetailsVisible(selected[0], true); + detailsOpenIndex = selected[0]; + } + }); + + } + var workerSocket = new WebSocket(backend_address); var workers = Polymer.dom(this.root).querySelector("#workers"); workerSocket.onopen = function() {