diff --git a/python/ray/dashboard/client/src/App.tsx b/python/ray/dashboard/client/src/App.tsx index b42c58c05..5ffaa1ba7 100644 --- a/python/ray/dashboard/client/src/App.tsx +++ b/python/ray/dashboard/client/src/App.tsx @@ -1,7 +1,9 @@ import CssBaseline from "@material-ui/core/CssBaseline"; import React from "react"; -import { BrowserRouter } from "react-router-dom"; +import { BrowserRouter, Route } from "react-router-dom"; import Dashboard from "./Dashboard"; +import Errors from "./Errors"; +import Logs from "./Logs"; class App extends React.Component { render() { @@ -9,6 +11,8 @@ class App extends React.Component { + + ); } diff --git a/python/ray/dashboard/client/src/Dashboard.tsx b/python/ray/dashboard/client/src/Dashboard.tsx index da87efcca..d4690f20d 100644 --- a/python/ray/dashboard/client/src/Dashboard.tsx +++ b/python/ray/dashboard/client/src/Dashboard.tsx @@ -12,10 +12,7 @@ import AddIcon from "@material-ui/icons/Add"; import RemoveIcon from "@material-ui/icons/Remove"; import classNames from "classnames"; import React from "react"; -import { Route } from "react-router"; import { Link as RouterLink } from "react-router-dom"; -import Errors from "./Errors"; -import Logs from "./Logs"; import UsageBar from "./UsageBar"; const formatByteAmount = (amount: number, unit: "mebibyte" | "gibibyte") => @@ -122,17 +119,15 @@ interface NodeInfo { memory_full_info: null; }>; }>; - logs: { + log_counts: { [ip: string]: { - [pid: string]: string[]; + [pid: string]: number; }; }; - errors: { - [jobId: string]: Array<{ - message: string; - timestamp: number; - type: string; - }>; + error_counts: { + [ip: string]: { + [pid: string]: number; + }; }; } @@ -143,7 +138,7 @@ interface State { } | null; error: string | null; expanded: { - [hostname: string]: boolean; + [ip: string]: boolean; }; } @@ -160,7 +155,7 @@ class Component extends React.Component, State> { "/api/node_info", process.env.NODE_ENV === "development" ? "http://localhost:8080" - : window.location.href + : window.location.origin ); const response = await fetch(url.toString()); const json = await response.json(); @@ -172,11 +167,11 @@ class Component extends React.Component, State> { } }; - toggleExpand = (hostname: string) => () => { + toggleExpand = (ip: string) => () => { this.setState(state => ({ expanded: { ...state.expanded, - [hostname]: !state.expanded[hostname] + [ip]: !state.expanded[ip] } })); }; @@ -208,7 +203,7 @@ class Component extends React.Component, State> { const { result, timestamp } = response; const logCounts: { - [hostname: string]: { + [ip: string]: { perWorker: { [pid: string]: number; }; @@ -217,7 +212,7 @@ class Component extends React.Component, State> { } = {}; const errorCounts: { - [hostname: string]: { + [ip: string]: { perWorker: { [pid: string]: number; }; @@ -226,54 +221,32 @@ class Component extends React.Component, State> { } = {}; for (const client of result.clients) { - logCounts[client.hostname] = { perWorker: {}, total: 0 }; - errorCounts[client.hostname] = { perWorker: {}, total: 0 }; + logCounts[client.ip] = { perWorker: {}, total: 0 }; + errorCounts[client.ip] = { perWorker: {}, total: 0 }; for (const worker of client.workers) { - logCounts[client.hostname].perWorker[worker.pid] = 0; - errorCounts[client.hostname].perWorker[worker.pid] = 0; + logCounts[client.ip].perWorker[worker.pid] = 0; + errorCounts[client.ip].perWorker[worker.pid] = 0; } } - for (const ip of Object.keys(result.logs)) { - let hostname: string | null = null; - for (const client of result.clients) { - if (ip === client.ip) { - hostname = client.hostname; - break; - } - } - if (hostname !== null) { - for (const pid of Object.keys(result.logs[ip])) { - const logCount = result.logs[ip][pid].length; - if (pid in logCounts[hostname].perWorker) { - logCounts[hostname].perWorker[pid] = logCount; - } - logCounts[hostname].total += logCount; + for (const ip of Object.keys(result.log_counts)) { + if (ip in logCounts) { + for (const [pid, count] of Object.entries(result.log_counts[ip])) { + logCounts[ip].perWorker[pid] = count; + logCounts[ip].total += count; } } } - for (const jobErrors of Object.values(result.errors)) { - for (const error of jobErrors) { - const match = error.message.match(/\(pid=(\d+), host=(.*?)\)/); - if (match !== null) { - const pid = match[1]; - const hostname = match[2]; - if (hostname in errorCounts) { - if (pid in errorCounts[hostname].perWorker) { - errorCounts[hostname].perWorker[pid]++; - } - errorCounts[hostname].total++; - } + for (const ip of Object.keys(result.error_counts)) { + if (ip in errorCounts) { + for (const [pid, count] of Object.entries(result.error_counts[ip])) { + errorCounts[ip].perWorker[pid] = count; + errorCounts[ip].total += count; } } } - const ipToHostname: { [ip: string]: string } = {}; - for (const client of result.clients) { - ipToHostname[client.ip] = client.hostname; - } - return (
Ray Dashboard @@ -281,7 +254,7 @@ class Component extends React.Component, State> { - Hostname + Host Workers Uptime CPU @@ -296,23 +269,23 @@ class Component extends React.Component, State> { {result.clients.map(client => { return ( - + - {!expanded[client.hostname] ? ( + {!expanded[client.ip] ? ( ) : ( )} - {client.hostname} + {client.hostname} ({client.ip}) {client.workers.length} @@ -356,7 +329,7 @@ class Component extends React.Component, State> { {/*{(client.net[0] / Math.pow(1024, 2)).toFixed(3)} MiB/s*/} {/*{(client.net[1] / Math.pow(1024, 2)).toFixed(3)} MiB/s*/} - {logCounts[client.hostname].total === 0 ? ( + {logCounts[client.ip].total === 0 ? ( No logs ) : ( , State> { to={`/logs/${client.hostname}`} > View all logs ( - {logCounts[client.hostname].total.toLocaleString()}{" "} - {logCounts[client.hostname].total === 1 - ? "line" - : "lines"} - ) + {logCounts[client.ip].total.toLocaleString()}{" "} + {logCounts[client.ip].total === 1 ? "line" : "lines"}) )} - {errorCounts[client.hostname].total === 0 ? ( + {errorCounts[client.ip].total === 0 ? ( No errors ) : ( , State> { to={`/errors/${client.hostname}`} > View all errors ( - {errorCounts[client.hostname].total.toLocaleString()}) + {errorCounts[client.ip].total.toLocaleString()}) )} - {expanded[client.hostname] && + {expanded[client.ip] && client.workers.map((worker, index: number) => ( @@ -425,8 +395,7 @@ class Component extends React.Component, State> { - {logCounts[client.hostname].perWorker[worker.pid] === - 0 ? ( + {logCounts[client.ip].perWorker[worker.pid] === 0 ? ( No logs ) : ( , State> { to={`/logs/${client.hostname}/${worker.pid}`} > View log ( - {logCounts[client.hostname].perWorker[ + {logCounts[client.ip].perWorker[ worker.pid ].toLocaleString()}{" "} - {logCounts[client.hostname].perWorker[ - worker.pid - ] === 1 + {logCounts[client.ip].perWorker[worker.pid] === 1 ? "line" : "lines"} ) @@ -447,9 +414,8 @@ class Component extends React.Component, State> { )} - {errorCounts[client.hostname].perWorker[ - worker.pid - ] === 0 ? ( + {errorCounts[client.ip].perWorker[worker.pid] === + 0 ? ( No errors ) : ( , State> { to={`/errors/${client.hostname}/${worker.pid}`} > View errors ( - {errorCounts[client.hostname].perWorker[ + {errorCounts[client.ip].perWorker[ worker.pid ].toLocaleString()} ) @@ -474,16 +440,6 @@ class Component extends React.Component, State> { Last updated: {new Date(timestamp * 1000).toLocaleString()} - ( - - )} - /> - } - />
); } diff --git a/python/ray/dashboard/client/src/Errors.tsx b/python/ray/dashboard/client/src/Errors.tsx index 2361b2dcc..189cdd853 100644 --- a/python/ray/dashboard/client/src/Errors.tsx +++ b/python/ray/dashboard/client/src/Errors.tsx @@ -1,5 +1,6 @@ import Dialog from "@material-ui/core/Dialog"; import IconButton from "@material-ui/core/IconButton"; +import { fade } from "@material-ui/core/styles/colorManipulator"; import { Theme } from "@material-ui/core/styles/createMuiTheme"; import createStyles from "@material-ui/core/styles/createStyles"; import withStyles, { WithStyles } from "@material-ui/core/styles/withStyles"; @@ -16,85 +17,88 @@ const styles = (theme: Theme) => }, closeButton: { position: "absolute", - right: theme.spacing(1), - top: theme.spacing(1), + right: theme.spacing(1.5), + top: theme.spacing(1.5), zIndex: 1 }, title: { borderBottomColor: theme.palette.divider, borderBottomStyle: "solid", borderBottomWidth: 1, + fontSize: "1.5rem", lineHeight: 1, marginBottom: theme.spacing(3), - paddingBottom: theme.spacing(3), - position: "relative", - "&:not(:first-of-type)": { - marginTop: theme.spacing(6) - } + paddingBottom: theme.spacing(3) + }, + header: { + lineHeight: 1, + marginBottom: theme.spacing(3), + marginTop: theme.spacing(3) }, error: { - "&:not(:last-child)": { - marginBottom: theme.spacing(3) - } + backgroundColor: fade(theme.palette.error.main, 0.06), + borderLeftColor: theme.palette.error.main, + borderLeftStyle: "solid", + borderLeftWidth: 3, + marginTop: theme.spacing(3), + padding: theme.spacing(2) }, timestamp: { + color: theme.palette.text.secondary, marginBottom: theme.spacing(1) } }); -interface Props { - errors: { - [jobId: string]: Array<{ +interface State { + result: { + [pid: string]: Array<{ message: string; timestamp: number; type: string; }>; - }; + } | null; + error: string | null; } class Component extends React.Component< - Props & - WithStyles & - RouteComponentProps<{ hostname: string; pid: string | undefined }> + WithStyles & + RouteComponentProps<{ hostname: string; pid: string | undefined }>, + State > { + state: State = { + result: null, + error: null + }; + handleClose = () => { this.props.history.push("/"); }; - render() { - const { classes, errors, match } = this.props; - const { hostname, pid } = match.params; - - let errorsForHost: { - [pid: string]: Array<{ - lines: string[]; - timestamp: number; - }>; - } = {}; - - for (const jobErrors of Object.values(errors)) { - for (const error of jobErrors) { - const match = error.message.match(/\(pid=(\d+), host=(.*?)\)/); - if (match !== null && match[2] === hostname) { - const pid = match[1]; - if (!(pid in errorsForHost)) { - errorsForHost[pid] = []; - } - errorsForHost[pid].push({ - lines: error.message - .replace(/\u001b\[\d+m/g, "") // eslint-disable-line no-control-regex - .trim() - .split("\n"), - timestamp: error.timestamp - }); - } - } + async componentDidMount() { + try { + const { match } = this.props; + const { hostname, pid } = match.params; + const url = new URL( + "/api/errors", + process.env.NODE_ENV === "development" + ? "http://localhost:8080" + : window.location.origin + ); + url.searchParams.set("hostname", hostname); + url.searchParams.set("pid", pid || ""); + const response = await fetch(url.toString()); + const json = await response.json(); + this.setState({ result: json.result, error: null }); + } catch (error) { + this.setState({ result: null, error: error.toString() }); } + } - const errorsToDisplay = - pid === undefined - ? errorsForHost - : { [pid]: pid in errorsForHost ? errorsForHost[pid] : [] }; + render() { + const { classes, match } = this.props; + const { result, error } = this.state; + + const { hostname } = match.params; return ( - {Object.entries(errorsToDisplay).map(([pid, errors]) => ( - - - {hostname} (PID: {pid}) - - {errors.length > 0 ? ( - errors.map(({ lines, timestamp }, index) => ( -
- - Error at {new Date(timestamp * 1000).toLocaleString()} - - -
- )) - ) : ( - No errors found. - )} -
- ))} + Errors + {error !== null ? ( + {error} + ) : result === null ? ( + Loading... + ) : ( + Object.entries(result).map(([pid, errors]) => ( + + + {hostname} (PID: {pid}) + + {errors.length > 0 ? ( + errors.map(({ message, timestamp }, index) => ( +
+ + Error at {new Date(timestamp * 1000).toLocaleString()} + + +
+ )) + ) : ( + No errors found. + )} +
+ )) + )}
); } diff --git a/python/ray/dashboard/client/src/Logs.tsx b/python/ray/dashboard/client/src/Logs.tsx index e74b35573..5f5625f52 100644 --- a/python/ray/dashboard/client/src/Logs.tsx +++ b/python/ray/dashboard/client/src/Logs.tsx @@ -16,65 +16,70 @@ const styles = (theme: Theme) => }, closeButton: { position: "absolute", - right: theme.spacing(1), - top: theme.spacing(1), + right: theme.spacing(1.5), + top: theme.spacing(1.5), zIndex: 1 }, title: { borderBottomColor: theme.palette.divider, borderBottomStyle: "solid", borderBottomWidth: 1, + fontSize: "1.5rem", lineHeight: 1, marginBottom: theme.spacing(3), - paddingBottom: theme.spacing(3), - position: "relative", - "&:not(:first-of-type)": { - marginTop: theme.spacing(3) - } + paddingBottom: theme.spacing(3) + }, + header: { + lineHeight: 1, + marginBottom: theme.spacing(3), + marginTop: theme.spacing(3) } }); -interface Props { - ipToHostname: { - [ip: string]: string; - }; - logs: { - [ip: string]: { - [pid: string]: string[]; - }; - }; +interface State { + result: { [pid: string]: string[] } | null; + error: string | null; } class Component extends React.Component< - Props & - WithStyles & - RouteComponentProps<{ hostname: string; pid: string | undefined }> + WithStyles & + RouteComponentProps<{ hostname: string; pid: string | undefined }>, + State > { + state: State = { + result: null, + error: null + }; + handleClose = () => { this.props.history.push("/"); }; - render() { - const { classes, ipToHostname, logs, match } = this.props; - const { hostname, pid } = match.params; - - let logsForHost: { - [pid: string]: string[]; - } = {}; - - for (const ip of Object.keys(ipToHostname)) { - if (ipToHostname[ip] === hostname) { - if (ip in logs) { - logsForHost = logs[ip]; - } - break; - } + async componentDidMount() { + try { + const { match } = this.props; + const { hostname, pid } = match.params; + const url = new URL( + "/api/logs", + process.env.NODE_ENV === "development" + ? "http://localhost:8080" + : window.location.origin + ); + url.searchParams.set("hostname", hostname); + url.searchParams.set("pid", pid || ""); + const response = await fetch(url.toString()); + const json = await response.json(); + this.setState({ result: json.result, error: null }); + } catch (error) { + this.setState({ result: null, error: error.toString() }); } + } - const logsToDisplay = - pid === undefined - ? logsForHost - : { [pid]: pid in logsForHost ? logsForHost[pid] : [] }; + render() { + const { classes, match } = this.props; + const { result, error } = this.state; + + const { hostname } = match.params; return ( - {Object.entries(logsToDisplay).map(([pid, lines]) => ( - - - {hostname} (PID: {pid}) - - {lines.length > 0 ? ( - - ) : ( - No logs found. - )} - - ))} + Logs + {error !== null ? ( + {error} + ) : result === null ? ( + Loading... + ) : ( + Object.entries(result).map(([pid, lines]) => ( + + + {hostname} (PID: {pid}) + + {lines.length > 0 ? ( + + ) : ( + No logs found. + )} + + )) + )} ); } diff --git a/python/ray/dashboard/dashboard.py b/python/ray/dashboard/dashboard.py index 48d234a1d..775765204 100644 --- a/python/ray/dashboard/dashboard.py +++ b/python/ray/dashboard/dashboard.py @@ -14,6 +14,7 @@ import datetime import json import logging import os +import re import threading import traceback import yaml @@ -165,6 +166,18 @@ class Dashboard(object): D = self.node_stats.get_node_stats() return await json_response(result=D, ts=now) + async def logs(req) -> aiohttp.web.Response: + hostname = req.query.get("hostname") + pid = req.query.get("pid") + result = self.node_stats.get_logs(hostname, pid) + return await json_response(result=result) + + async def errors(req) -> aiohttp.web.Response: + hostname = req.query.get("hostname") + pid = req.query.get("pid") + result = self.node_stats.get_errors(hostname, pid) + return await json_response(result=result) + self.app.router.add_get("/", get_index) static_dir = os.path.join( @@ -176,8 +189,10 @@ class Dashboard(object): "required to build the dashboard.".format(static_dir)) self.app.router.add_static("/static", static_dir) - self.app.router.add_get("/api/node_info", node_info) self.app.router.add_get("/api/ray_config", ray_config) + self.app.router.add_get("/api/node_info", node_info) + self.app.router.add_get("/api/logs", logs) + self.app.router.add_get("/api/errors", errors) self.app.router.add_get("/{_}", get_forbidden) @@ -205,6 +220,9 @@ class NodeStats(threading.Thread): # Mapping from IP address to PID to list of log lines self._logs = defaultdict(lambda: defaultdict(list)) + # Mapping from IP address to PID to list of error messages + self._errors = defaultdict(lambda: defaultdict(list)) + ray.init(redis_address=redis_address, redis_password=redis_password) super().__init__() @@ -254,6 +272,24 @@ class NodeStats(threading.Thread): for y in (v["workers"] for v in self._node_stats.values()) for x in y)) + def calculate_log_counts(self): + return { + ip: { + pid: len(logs_for_pid) + for pid, logs_for_pid in logs_for_ip.items() + } + for ip, logs_for_ip in self._logs.items() + } + + def calculate_error_counts(self): + return { + ip: { + pid: len(errors_for_pid) + for pid, errors_for_pid in errors_for_ip.items() + } + for ip, errors_for_ip in self._errors.items() + } + def purge_outdated_stats(self): def current(then, now): if (now - then) > 5: @@ -277,10 +313,24 @@ class NodeStats(threading.Thread): "totals": self.calculate_totals(), "tasks": self.calculate_tasks(), "clients": node_stats, - "logs": self._logs, - "errors": ray.errors(all_jobs=True), + "log_counts": self.calculate_log_counts(), + "error_counts": self.calculate_error_counts(), } + def get_logs(self, hostname, pid): + ip = self._node_stats.get(hostname, {"ip": None})["ip"] + logs = self._logs.get(ip, {}) + if pid: + logs = {pid: logs.get(pid, [])} + return logs + + def get_errors(self, hostname, pid): + ip = self._node_stats.get(hostname, {"ip": None})["ip"] + errors = self._errors.get(ip, {}) + if pid: + errors = {pid: errors.get(pid, [])} + return errors + def run(self): p = self.redis_client.pubsub(ignore_subscribe_messages=True) @@ -291,16 +341,38 @@ class NodeStats(threading.Thread): p.subscribe(log_channel) logger.info("NodeStats: subscribed to {}".format(log_channel)) + error_channel = ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB") + p.subscribe(error_channel) + logger.info("NodeStats: subscribed to {}".format(error_channel)) + for x in p.listen(): try: with self._node_stats_lock: channel = ray.utils.decode(x["channel"]) + data = x["data"] if channel == log_channel: - D = json.loads(ray.utils.decode(x["data"])) - self._logs[D["ip"]][D["pid"]].extend(D["lines"]) + data = json.loads(ray.utils.decode(data)) + ip = data["ip"] + pid = str(data["pid"]) + self._logs[ip][pid].extend(data["lines"]) + elif channel == str(error_channel): + gcs_entry = ray.gcs_utils.GcsEntry.FromString(data) + error_data = ray.gcs_utils.ErrorTableData.FromString( + gcs_entry.entries[0]) + message = error_data.error_message + message = re.sub(r"\x1b\[\d+m", "", message) + match = re.search(r"\(pid=(\d+), ip=(.*?)\)", message) + if match: + pid = match.group(1) + ip = match.group(2) + self._errors[ip][pid].append({ + "message": message, + "timestamp": error_data.timestamp, + "type": error_data.type + }) else: - D = json.loads(ray.utils.decode(x["data"])) - self._node_stats[D["hostname"]] = D + data = json.loads(ray.utils.decode(data)) + self._node_stats[data["hostname"]] = data except Exception: logger.exception(traceback.format_exc()) continue diff --git a/python/ray/exceptions.py b/python/ray/exceptions.py index ba0c2e5d6..0d36e192f 100644 --- a/python/ray/exceptions.py +++ b/python/ray/exceptions.py @@ -7,6 +7,8 @@ try: except ImportError: setproctitle = None +import ray + class RayError(Exception): """Super class of all ray exception types.""" @@ -33,14 +35,14 @@ class RayTaskError(RayError): traceback_str, cause_cls, pid=None, - host=None): + ip=None): """Initialize a RayTaskError.""" if setproctitle: self.proctitle = setproctitle.getproctitle() else: self.proctitle = "ray_worker" self.pid = pid or os.getpid() - self.host = host or os.uname()[1] + self.ip = ip or ray.services.get_node_ip_address() self.function_name = function_name self.traceback_str = traceback_str self.cause_cls = cause_cls @@ -67,7 +69,7 @@ class RayTaskError(RayError): cls.__qualname__ = name return cls(self.function_name, self.traceback_str, self.cause_cls, - self.pid, self.host) + self.pid, self.ip) cls.original = self return cls @@ -78,9 +80,9 @@ class RayTaskError(RayError): in_worker = False for line in lines: if line.startswith("Traceback "): - out.append("{}{}{} (pid={}, host={})".format( + out.append("{}{}{} (pid={}, ip={})".format( colorama.Fore.CYAN, self.proctitle, colorama.Fore.RESET, - self.pid, self.host)) + self.pid, self.ip)) elif in_worker: in_worker = False elif "ray/worker.py" in line or "ray/function_manager.py" in line: diff --git a/python/ray/reporter.py b/python/ray/reporter.py index 9cb12959c..220fc580e 100644 --- a/python/ray/reporter.py +++ b/python/ray/reporter.py @@ -9,7 +9,6 @@ import os import traceback import time import datetime -from socket import AddressFamily try: import psutil @@ -19,6 +18,7 @@ except ImportError: sys.exit(1) import ray.ray_constants as ray_constants +import ray.services import ray.utils # Logger for this module. It should be configured at the entry point @@ -52,15 +52,6 @@ def is_worker(cmdline): return cmdline and cmdline[0].startswith("ray_") -def determine_ip_address(): - """Return the first IP address for an ethernet interface on the system.""" - addrs = [ - x.address for k, v in psutil.net_if_addrs().items() if k[0] == "e" - for x in v if x.family == AddressFamily.AF_INET - ] - return addrs[0] - - def to_posix_time(dt): return (dt - datetime.datetime(1970, 1, 1)).total_seconds() @@ -77,7 +68,7 @@ class Reporter(object): def __init__(self, redis_address, redis_password=None): """Initialize the reporter object.""" self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False)) - self.ip_addr = determine_ip_address() + self.ip = ray.services.get_node_ip_address() self.hostname = os.uname().nodename _ = psutil.cpu_percent() # For initialization @@ -145,7 +136,7 @@ class Reporter(object): return { "now": now, "hostname": self.hostname, - "ip": self.ip_addr, + "ip": self.ip, "cpu": self.get_cpu_percent(), "cpus": self.cpu_counts, "mem": self.get_mem_usage(),