[Dashboard] Improve handling of logs and errors in dashboard backend (#5857)

* Improve handling of logs and errors in dashboard backend

* Update nested dict comprehension for clarity
This commit is contained in:
Mitchell Stern
2019-10-10 11:59:54 -07:00
committed by Eric Liang
parent 1a8ac3db46
commit 195ca43e9c
7 changed files with 278 additions and 230 deletions
+5 -1
View File
@@ -1,7 +1,9 @@
import CssBaseline from "@material-ui/core/CssBaseline";
import React from "react";
import { BrowserRouter } from "react-router-dom";
import { BrowserRouter, Route } from "react-router-dom";
import Dashboard from "./Dashboard";
import Errors from "./Errors";
import Logs from "./Logs";
class App extends React.Component {
render() {
@@ -9,6 +11,8 @@ class App extends React.Component {
<BrowserRouter>
<CssBaseline />
<Dashboard />
<Route component={Logs} path="/logs/:hostname/:pid?" />
<Route component={Errors} path="/errors/:hostname/:pid?" />
</BrowserRouter>
);
}
+43 -87
View File
@@ -12,10 +12,7 @@ import AddIcon from "@material-ui/icons/Add";
import RemoveIcon from "@material-ui/icons/Remove";
import classNames from "classnames";
import React from "react";
import { Route } from "react-router";
import { Link as RouterLink } from "react-router-dom";
import Errors from "./Errors";
import Logs from "./Logs";
import UsageBar from "./UsageBar";
const formatByteAmount = (amount: number, unit: "mebibyte" | "gibibyte") =>
@@ -122,17 +119,15 @@ interface NodeInfo {
memory_full_info: null;
}>;
}>;
logs: {
log_counts: {
[ip: string]: {
[pid: string]: string[];
[pid: string]: number;
};
};
errors: {
[jobId: string]: Array<{
message: string;
timestamp: number;
type: string;
}>;
error_counts: {
[ip: string]: {
[pid: string]: number;
};
};
}
@@ -143,7 +138,7 @@ interface State {
} | null;
error: string | null;
expanded: {
[hostname: string]: boolean;
[ip: string]: boolean;
};
}
@@ -160,7 +155,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
"/api/node_info",
process.env.NODE_ENV === "development"
? "http://localhost:8080"
: window.location.href
: window.location.origin
);
const response = await fetch(url.toString());
const json = await response.json();
@@ -172,11 +167,11 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
}
};
toggleExpand = (hostname: string) => () => {
toggleExpand = (ip: string) => () => {
this.setState(state => ({
expanded: {
...state.expanded,
[hostname]: !state.expanded[hostname]
[ip]: !state.expanded[ip]
}
}));
};
@@ -208,7 +203,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
const { result, timestamp } = response;
const logCounts: {
[hostname: string]: {
[ip: string]: {
perWorker: {
[pid: string]: number;
};
@@ -217,7 +212,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
} = {};
const errorCounts: {
[hostname: string]: {
[ip: string]: {
perWorker: {
[pid: string]: number;
};
@@ -226,54 +221,32 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
} = {};
for (const client of result.clients) {
logCounts[client.hostname] = { perWorker: {}, total: 0 };
errorCounts[client.hostname] = { perWorker: {}, total: 0 };
logCounts[client.ip] = { perWorker: {}, total: 0 };
errorCounts[client.ip] = { perWorker: {}, total: 0 };
for (const worker of client.workers) {
logCounts[client.hostname].perWorker[worker.pid] = 0;
errorCounts[client.hostname].perWorker[worker.pid] = 0;
logCounts[client.ip].perWorker[worker.pid] = 0;
errorCounts[client.ip].perWorker[worker.pid] = 0;
}
}
for (const ip of Object.keys(result.logs)) {
let hostname: string | null = null;
for (const client of result.clients) {
if (ip === client.ip) {
hostname = client.hostname;
break;
}
}
if (hostname !== null) {
for (const pid of Object.keys(result.logs[ip])) {
const logCount = result.logs[ip][pid].length;
if (pid in logCounts[hostname].perWorker) {
logCounts[hostname].perWorker[pid] = logCount;
}
logCounts[hostname].total += logCount;
for (const ip of Object.keys(result.log_counts)) {
if (ip in logCounts) {
for (const [pid, count] of Object.entries(result.log_counts[ip])) {
logCounts[ip].perWorker[pid] = count;
logCounts[ip].total += count;
}
}
}
for (const jobErrors of Object.values(result.errors)) {
for (const error of jobErrors) {
const match = error.message.match(/\(pid=(\d+), host=(.*?)\)/);
if (match !== null) {
const pid = match[1];
const hostname = match[2];
if (hostname in errorCounts) {
if (pid in errorCounts[hostname].perWorker) {
errorCounts[hostname].perWorker[pid]++;
}
errorCounts[hostname].total++;
}
for (const ip of Object.keys(result.error_counts)) {
if (ip in errorCounts) {
for (const [pid, count] of Object.entries(result.error_counts[ip])) {
errorCounts[ip].perWorker[pid] = count;
errorCounts[ip].total += count;
}
}
}
const ipToHostname: { [ip: string]: string } = {};
for (const client of result.clients) {
ipToHostname[client.ip] = client.hostname;
}
return (
<div className={classes.root}>
<Typography variant="h5">Ray Dashboard</Typography>
@@ -281,7 +254,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
<TableHead>
<TableRow>
<TableCell className={classes.cell} />
<TableCell className={classes.cell}>Hostname</TableCell>
<TableCell className={classes.cell}>Host</TableCell>
<TableCell className={classes.cell}>Workers</TableCell>
<TableCell className={classes.cell}>Uptime</TableCell>
<TableCell className={classes.cell}>CPU</TableCell>
@@ -296,23 +269,23 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
<TableBody>
{result.clients.map(client => {
return (
<React.Fragment key={client.hostname}>
<React.Fragment key={client.ip}>
<TableRow hover>
<TableCell
className={classNames(
classes.cell,
classes.expandCollapseCell
)}
onClick={this.toggleExpand(client.hostname)}
onClick={this.toggleExpand(client.ip)}
>
{!expanded[client.hostname] ? (
{!expanded[client.ip] ? (
<AddIcon className={classes.expandCollapseIcon} />
) : (
<RemoveIcon className={classes.expandCollapseIcon} />
)}
</TableCell>
<TableCell className={classes.cell}>
{client.hostname}
{client.hostname} ({client.ip})
</TableCell>
<TableCell className={classes.cell}>
{client.workers.length}
@@ -356,7 +329,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
{/*<TableCell className={classes.cell}>{(client.net[0] / Math.pow(1024, 2)).toFixed(3)} MiB/s</TableCell>*/}
{/*<TableCell className={classes.cell}>{(client.net[1] / Math.pow(1024, 2)).toFixed(3)} MiB/s</TableCell>*/}
<TableCell className={classes.cell}>
{logCounts[client.hostname].total === 0 ? (
{logCounts[client.ip].total === 0 ? (
<span className={classes.secondary}>No logs</span>
) : (
<Link
@@ -364,16 +337,13 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
to={`/logs/${client.hostname}`}
>
View all logs (
{logCounts[client.hostname].total.toLocaleString()}{" "}
{logCounts[client.hostname].total === 1
? "line"
: "lines"}
)
{logCounts[client.ip].total.toLocaleString()}{" "}
{logCounts[client.ip].total === 1 ? "line" : "lines"})
</Link>
)}
</TableCell>
<TableCell className={classes.cell}>
{errorCounts[client.hostname].total === 0 ? (
{errorCounts[client.ip].total === 0 ? (
<span className={classes.secondary}>No errors</span>
) : (
<Link
@@ -381,12 +351,12 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
to={`/errors/${client.hostname}`}
>
View all errors (
{errorCounts[client.hostname].total.toLocaleString()})
{errorCounts[client.ip].total.toLocaleString()})
</Link>
)}
</TableCell>
</TableRow>
{expanded[client.hostname] &&
{expanded[client.ip] &&
client.workers.map((worker, index: number) => (
<TableRow hover key={index}>
<TableCell className={classes.cell} />
@@ -425,8 +395,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
</span>
</TableCell>
<TableCell className={classes.cell}>
{logCounts[client.hostname].perWorker[worker.pid] ===
0 ? (
{logCounts[client.ip].perWorker[worker.pid] === 0 ? (
<span className={classes.secondary}>No logs</span>
) : (
<Link
@@ -434,12 +403,10 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
to={`/logs/${client.hostname}/${worker.pid}`}
>
View log (
{logCounts[client.hostname].perWorker[
{logCounts[client.ip].perWorker[
worker.pid
].toLocaleString()}{" "}
{logCounts[client.hostname].perWorker[
worker.pid
] === 1
{logCounts[client.ip].perWorker[worker.pid] === 1
? "line"
: "lines"}
)
@@ -447,9 +414,8 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
)}
</TableCell>
<TableCell className={classes.cell}>
{errorCounts[client.hostname].perWorker[
worker.pid
] === 0 ? (
{errorCounts[client.ip].perWorker[worker.pid] ===
0 ? (
<span className={classes.secondary}>No errors</span>
) : (
<Link
@@ -457,7 +423,7 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
to={`/errors/${client.hostname}/${worker.pid}`}
>
View errors (
{errorCounts[client.hostname].perWorker[
{errorCounts[client.ip].perWorker[
worker.pid
].toLocaleString()}
)
@@ -474,16 +440,6 @@ class Component extends React.Component<WithStyles<typeof styles>, State> {
<Typography align="center">
Last updated: {new Date(timestamp * 1000).toLocaleString()}
</Typography>
<Route
path="/logs/:hostname/:pid?"
render={props => (
<Logs {...props} ipToHostname={ipToHostname} logs={result.logs} />
)}
/>
<Route
path="/errors/:hostname/:pid?"
render={props => <Errors {...props} errors={result.errors} />}
/>
</div>
);
}
+79 -68
View File
@@ -1,5 +1,6 @@
import Dialog from "@material-ui/core/Dialog";
import IconButton from "@material-ui/core/IconButton";
import { fade } from "@material-ui/core/styles/colorManipulator";
import { Theme } from "@material-ui/core/styles/createMuiTheme";
import createStyles from "@material-ui/core/styles/createStyles";
import withStyles, { WithStyles } from "@material-ui/core/styles/withStyles";
@@ -16,85 +17,88 @@ const styles = (theme: Theme) =>
},
closeButton: {
position: "absolute",
right: theme.spacing(1),
top: theme.spacing(1),
right: theme.spacing(1.5),
top: theme.spacing(1.5),
zIndex: 1
},
title: {
borderBottomColor: theme.palette.divider,
borderBottomStyle: "solid",
borderBottomWidth: 1,
fontSize: "1.5rem",
lineHeight: 1,
marginBottom: theme.spacing(3),
paddingBottom: theme.spacing(3),
position: "relative",
"&:not(:first-of-type)": {
marginTop: theme.spacing(6)
}
paddingBottom: theme.spacing(3)
},
header: {
lineHeight: 1,
marginBottom: theme.spacing(3),
marginTop: theme.spacing(3)
},
error: {
"&:not(:last-child)": {
marginBottom: theme.spacing(3)
}
backgroundColor: fade(theme.palette.error.main, 0.06),
borderLeftColor: theme.palette.error.main,
borderLeftStyle: "solid",
borderLeftWidth: 3,
marginTop: theme.spacing(3),
padding: theme.spacing(2)
},
timestamp: {
color: theme.palette.text.secondary,
marginBottom: theme.spacing(1)
}
});
interface Props {
errors: {
[jobId: string]: Array<{
interface State {
result: {
[pid: string]: Array<{
message: string;
timestamp: number;
type: string;
}>;
};
} | null;
error: string | null;
}
class Component extends React.Component<
Props &
WithStyles<typeof styles> &
RouteComponentProps<{ hostname: string; pid: string | undefined }>
WithStyles<typeof styles> &
RouteComponentProps<{ hostname: string; pid: string | undefined }>,
State
> {
state: State = {
result: null,
error: null
};
handleClose = () => {
this.props.history.push("/");
};
render() {
const { classes, errors, match } = this.props;
const { hostname, pid } = match.params;
let errorsForHost: {
[pid: string]: Array<{
lines: string[];
timestamp: number;
}>;
} = {};
for (const jobErrors of Object.values(errors)) {
for (const error of jobErrors) {
const match = error.message.match(/\(pid=(\d+), host=(.*?)\)/);
if (match !== null && match[2] === hostname) {
const pid = match[1];
if (!(pid in errorsForHost)) {
errorsForHost[pid] = [];
}
errorsForHost[pid].push({
lines: error.message
.replace(/\u001b\[\d+m/g, "") // eslint-disable-line no-control-regex
.trim()
.split("\n"),
timestamp: error.timestamp
});
}
}
async componentDidMount() {
try {
const { match } = this.props;
const { hostname, pid } = match.params;
const url = new URL(
"/api/errors",
process.env.NODE_ENV === "development"
? "http://localhost:8080"
: window.location.origin
);
url.searchParams.set("hostname", hostname);
url.searchParams.set("pid", pid || "");
const response = await fetch(url.toString());
const json = await response.json();
this.setState({ result: json.result, error: null });
} catch (error) {
this.setState({ result: null, error: error.toString() });
}
}
const errorsToDisplay =
pid === undefined
? errorsForHost
: { [pid]: pid in errorsForHost ? errorsForHost[pid] : [] };
render() {
const { classes, match } = this.props;
const { result, error } = this.state;
const { hostname } = match.params;
return (
<Dialog
@@ -108,25 +112,32 @@ class Component extends React.Component<
<IconButton className={classes.closeButton} onClick={this.handleClose}>
<CloseIcon />
</IconButton>
{Object.entries(errorsToDisplay).map(([pid, errors]) => (
<React.Fragment key={pid}>
<Typography className={classes.title}>
{hostname} (PID: {pid})
</Typography>
{errors.length > 0 ? (
errors.map(({ lines, timestamp }, index) => (
<div className={classes.error} key={index}>
<Typography className={classes.timestamp}>
Error at {new Date(timestamp * 1000).toLocaleString()}
</Typography>
<NumberedLines lines={lines} />
</div>
))
) : (
<Typography color="textSecondary">No errors found.</Typography>
)}
</React.Fragment>
))}
<Typography className={classes.title}>Errors</Typography>
{error !== null ? (
<Typography color="error">{error}</Typography>
) : result === null ? (
<Typography color="textSecondary">Loading...</Typography>
) : (
Object.entries(result).map(([pid, errors]) => (
<React.Fragment key={pid}>
<Typography className={classes.header}>
{hostname} (PID: {pid})
</Typography>
{errors.length > 0 ? (
errors.map(({ message, timestamp }, index) => (
<div className={classes.error} key={index}>
<Typography className={classes.timestamp}>
Error at {new Date(timestamp * 1000).toLocaleString()}
</Typography>
<NumberedLines lines={message.trim().split("\n")} />
</div>
))
) : (
<Typography color="textSecondary">No errors found.</Typography>
)}
</React.Fragment>
))
)}
</Dialog>
);
}
+62 -50
View File
@@ -16,65 +16,70 @@ const styles = (theme: Theme) =>
},
closeButton: {
position: "absolute",
right: theme.spacing(1),
top: theme.spacing(1),
right: theme.spacing(1.5),
top: theme.spacing(1.5),
zIndex: 1
},
title: {
borderBottomColor: theme.palette.divider,
borderBottomStyle: "solid",
borderBottomWidth: 1,
fontSize: "1.5rem",
lineHeight: 1,
marginBottom: theme.spacing(3),
paddingBottom: theme.spacing(3),
position: "relative",
"&:not(:first-of-type)": {
marginTop: theme.spacing(3)
}
paddingBottom: theme.spacing(3)
},
header: {
lineHeight: 1,
marginBottom: theme.spacing(3),
marginTop: theme.spacing(3)
}
});
interface Props {
ipToHostname: {
[ip: string]: string;
};
logs: {
[ip: string]: {
[pid: string]: string[];
};
};
interface State {
result: { [pid: string]: string[] } | null;
error: string | null;
}
class Component extends React.Component<
Props &
WithStyles<typeof styles> &
RouteComponentProps<{ hostname: string; pid: string | undefined }>
WithStyles<typeof styles> &
RouteComponentProps<{ hostname: string; pid: string | undefined }>,
State
> {
state: State = {
result: null,
error: null
};
handleClose = () => {
this.props.history.push("/");
};
render() {
const { classes, ipToHostname, logs, match } = this.props;
const { hostname, pid } = match.params;
let logsForHost: {
[pid: string]: string[];
} = {};
for (const ip of Object.keys(ipToHostname)) {
if (ipToHostname[ip] === hostname) {
if (ip in logs) {
logsForHost = logs[ip];
}
break;
}
async componentDidMount() {
try {
const { match } = this.props;
const { hostname, pid } = match.params;
const url = new URL(
"/api/logs",
process.env.NODE_ENV === "development"
? "http://localhost:8080"
: window.location.origin
);
url.searchParams.set("hostname", hostname);
url.searchParams.set("pid", pid || "");
const response = await fetch(url.toString());
const json = await response.json();
this.setState({ result: json.result, error: null });
} catch (error) {
this.setState({ result: null, error: error.toString() });
}
}
const logsToDisplay =
pid === undefined
? logsForHost
: { [pid]: pid in logsForHost ? logsForHost[pid] : [] };
render() {
const { classes, match } = this.props;
const { result, error } = this.state;
const { hostname } = match.params;
return (
<Dialog
@@ -88,18 +93,25 @@ class Component extends React.Component<
<IconButton className={classes.closeButton} onClick={this.handleClose}>
<CloseIcon />
</IconButton>
{Object.entries(logsToDisplay).map(([pid, lines]) => (
<React.Fragment key={pid}>
<Typography className={classes.title}>
{hostname} (PID: {pid})
</Typography>
{lines.length > 0 ? (
<NumberedLines lines={lines} />
) : (
<Typography color="textSecondary">No logs found.</Typography>
)}
</React.Fragment>
))}
<Typography className={classes.title}>Logs</Typography>
{error !== null ? (
<Typography color="error">{error}</Typography>
) : result === null ? (
<Typography color="textSecondary">Loading...</Typography>
) : (
Object.entries(result).map(([pid, lines]) => (
<React.Fragment key={pid}>
<Typography className={classes.header}>
{hostname} (PID: {pid})
</Typography>
{lines.length > 0 ? (
<NumberedLines lines={lines} />
) : (
<Typography color="textSecondary">No logs found.</Typography>
)}
</React.Fragment>
))
)}
</Dialog>
);
}
+79 -7
View File
@@ -14,6 +14,7 @@ import datetime
import json
import logging
import os
import re
import threading
import traceback
import yaml
@@ -165,6 +166,18 @@ class Dashboard(object):
D = self.node_stats.get_node_stats()
return await json_response(result=D, ts=now)
async def logs(req) -> aiohttp.web.Response:
hostname = req.query.get("hostname")
pid = req.query.get("pid")
result = self.node_stats.get_logs(hostname, pid)
return await json_response(result=result)
async def errors(req) -> aiohttp.web.Response:
hostname = req.query.get("hostname")
pid = req.query.get("pid")
result = self.node_stats.get_errors(hostname, pid)
return await json_response(result=result)
self.app.router.add_get("/", get_index)
static_dir = os.path.join(
@@ -176,8 +189,10 @@ class Dashboard(object):
"required to build the dashboard.".format(static_dir))
self.app.router.add_static("/static", static_dir)
self.app.router.add_get("/api/node_info", node_info)
self.app.router.add_get("/api/ray_config", ray_config)
self.app.router.add_get("/api/node_info", node_info)
self.app.router.add_get("/api/logs", logs)
self.app.router.add_get("/api/errors", errors)
self.app.router.add_get("/{_}", get_forbidden)
@@ -205,6 +220,9 @@ class NodeStats(threading.Thread):
# Mapping from IP address to PID to list of log lines
self._logs = defaultdict(lambda: defaultdict(list))
# Mapping from IP address to PID to list of error messages
self._errors = defaultdict(lambda: defaultdict(list))
ray.init(redis_address=redis_address, redis_password=redis_password)
super().__init__()
@@ -254,6 +272,24 @@ class NodeStats(threading.Thread):
for y in (v["workers"] for v in self._node_stats.values())
for x in y))
def calculate_log_counts(self):
return {
ip: {
pid: len(logs_for_pid)
for pid, logs_for_pid in logs_for_ip.items()
}
for ip, logs_for_ip in self._logs.items()
}
def calculate_error_counts(self):
return {
ip: {
pid: len(errors_for_pid)
for pid, errors_for_pid in errors_for_ip.items()
}
for ip, errors_for_ip in self._errors.items()
}
def purge_outdated_stats(self):
def current(then, now):
if (now - then) > 5:
@@ -277,10 +313,24 @@ class NodeStats(threading.Thread):
"totals": self.calculate_totals(),
"tasks": self.calculate_tasks(),
"clients": node_stats,
"logs": self._logs,
"errors": ray.errors(all_jobs=True),
"log_counts": self.calculate_log_counts(),
"error_counts": self.calculate_error_counts(),
}
def get_logs(self, hostname, pid):
ip = self._node_stats.get(hostname, {"ip": None})["ip"]
logs = self._logs.get(ip, {})
if pid:
logs = {pid: logs.get(pid, [])}
return logs
def get_errors(self, hostname, pid):
ip = self._node_stats.get(hostname, {"ip": None})["ip"]
errors = self._errors.get(ip, {})
if pid:
errors = {pid: errors.get(pid, [])}
return errors
def run(self):
p = self.redis_client.pubsub(ignore_subscribe_messages=True)
@@ -291,16 +341,38 @@ class NodeStats(threading.Thread):
p.subscribe(log_channel)
logger.info("NodeStats: subscribed to {}".format(log_channel))
error_channel = ray.gcs_utils.TablePubsub.Value("ERROR_INFO_PUBSUB")
p.subscribe(error_channel)
logger.info("NodeStats: subscribed to {}".format(error_channel))
for x in p.listen():
try:
with self._node_stats_lock:
channel = ray.utils.decode(x["channel"])
data = x["data"]
if channel == log_channel:
D = json.loads(ray.utils.decode(x["data"]))
self._logs[D["ip"]][D["pid"]].extend(D["lines"])
data = json.loads(ray.utils.decode(data))
ip = data["ip"]
pid = str(data["pid"])
self._logs[ip][pid].extend(data["lines"])
elif channel == str(error_channel):
gcs_entry = ray.gcs_utils.GcsEntry.FromString(data)
error_data = ray.gcs_utils.ErrorTableData.FromString(
gcs_entry.entries[0])
message = error_data.error_message
message = re.sub(r"\x1b\[\d+m", "", message)
match = re.search(r"\(pid=(\d+), ip=(.*?)\)", message)
if match:
pid = match.group(1)
ip = match.group(2)
self._errors[ip][pid].append({
"message": message,
"timestamp": error_data.timestamp,
"type": error_data.type
})
else:
D = json.loads(ray.utils.decode(x["data"]))
self._node_stats[D["hostname"]] = D
data = json.loads(ray.utils.decode(data))
self._node_stats[data["hostname"]] = data
except Exception:
logger.exception(traceback.format_exc())
continue
+7 -5
View File
@@ -7,6 +7,8 @@ try:
except ImportError:
setproctitle = None
import ray
class RayError(Exception):
"""Super class of all ray exception types."""
@@ -33,14 +35,14 @@ class RayTaskError(RayError):
traceback_str,
cause_cls,
pid=None,
host=None):
ip=None):
"""Initialize a RayTaskError."""
if setproctitle:
self.proctitle = setproctitle.getproctitle()
else:
self.proctitle = "ray_worker"
self.pid = pid or os.getpid()
self.host = host or os.uname()[1]
self.ip = ip or ray.services.get_node_ip_address()
self.function_name = function_name
self.traceback_str = traceback_str
self.cause_cls = cause_cls
@@ -67,7 +69,7 @@ class RayTaskError(RayError):
cls.__qualname__ = name
return cls(self.function_name, self.traceback_str, self.cause_cls,
self.pid, self.host)
self.pid, self.ip)
cls.original = self
return cls
@@ -78,9 +80,9 @@ class RayTaskError(RayError):
in_worker = False
for line in lines:
if line.startswith("Traceback "):
out.append("{}{}{} (pid={}, host={})".format(
out.append("{}{}{} (pid={}, ip={})".format(
colorama.Fore.CYAN, self.proctitle, colorama.Fore.RESET,
self.pid, self.host))
self.pid, self.ip))
elif in_worker:
in_worker = False
elif "ray/worker.py" in line or "ray/function_manager.py" in line:
+3 -12
View File
@@ -9,7 +9,6 @@ import os
import traceback
import time
import datetime
from socket import AddressFamily
try:
import psutil
@@ -19,6 +18,7 @@ except ImportError:
sys.exit(1)
import ray.ray_constants as ray_constants
import ray.services
import ray.utils
# Logger for this module. It should be configured at the entry point
@@ -52,15 +52,6 @@ def is_worker(cmdline):
return cmdline and cmdline[0].startswith("ray_")
def determine_ip_address():
"""Return the first IP address for an ethernet interface on the system."""
addrs = [
x.address for k, v in psutil.net_if_addrs().items() if k[0] == "e"
for x in v if x.family == AddressFamily.AF_INET
]
return addrs[0]
def to_posix_time(dt):
return (dt - datetime.datetime(1970, 1, 1)).total_seconds()
@@ -77,7 +68,7 @@ class Reporter(object):
def __init__(self, redis_address, redis_password=None):
"""Initialize the reporter object."""
self.cpu_counts = (psutil.cpu_count(), psutil.cpu_count(logical=False))
self.ip_addr = determine_ip_address()
self.ip = ray.services.get_node_ip_address()
self.hostname = os.uname().nodename
_ = psutil.cpu_percent() # For initialization
@@ -145,7 +136,7 @@ class Reporter(object):
return {
"now": now,
"hostname": self.hostname,
"ip": self.ip_addr,
"ip": self.ip,
"cpu": self.get_cpu_percent(),
"cpus": self.cpu_counts,
"mem": self.get_mem_usage(),