[Dashboard][Bugfix] Fix bug in display of worker logs and errors in Dashboard (#12660)

* Fix bug with worker logs/errors not displaying in the dashboard

* Add error endpoint test.

* lint
This commit is contained in:
Max Fitton
2020-12-07 21:41:13 -08:00
committed by GitHub
parent 34b9c7449b
commit cc2f43c826
2 changed files with 124 additions and 13 deletions
@@ -147,20 +147,22 @@ class StatsCollector(dashboard_utils.DashboardHeadModule):
@routes.get("/node_logs")
async def get_logs(self, req) -> aiohttp.web.Response:
ip = req.query["ip"]
pid = req.query.get("pid")
node_logs = DataSource.ip_and_pid_to_logs[ip]
payload = node_logs.get(pid, []) if pid else node_logs
pid = str(req.query.get("pid", ""))
node_logs = DataSource.ip_and_pid_to_logs.get(ip, {})
if pid:
node_logs = {str(pid): node_logs.get(pid, [])}
return dashboard_utils.rest_response(
success=True, message="Fetched logs.", logs=payload)
success=True, message="Fetched logs.", logs=node_logs)
@routes.get("/node_errors")
async def get_errors(self, req) -> aiohttp.web.Response:
ip = req.query["ip"]
pid = req.query.get("pid")
node_errors = DataSource.ip_and_pid_to_errors[ip]
filtered_errs = node_errors.get(pid, []) if pid else node_errors
pid = str(req.query.get("pid", ""))
node_errors = DataSource.ip_and_pid_to_errors.get(ip, {})
if pid:
node_errors = {str(pid): node_errors.get(pid, [])}
return dashboard_utils.rest_response(
success=True, message="Fetched errors.", errors=filtered_errs)
success=True, message="Fetched errors.", errors=node_errors)
async def _update_actors(self):
# Subscribe actor channel.
@@ -8,11 +8,9 @@ import traceback
import pytest
import ray
from ray.new_dashboard.tests.conftest import * # noqa
from ray.test_utils import (
format_web_url,
wait_until_server_available,
wait_for_condition,
)
from ray.test_utils import (format_web_url, wait_until_server_available,
wait_for_condition,
wait_until_succeeded_without_exception)
logger = logging.getLogger(__name__)
@@ -219,5 +217,116 @@ def test_multi_nodes_info(enable_test_module, disable_aiohttp_cache,
wait_for_condition(_check_nodes, timeout=10)
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"include_dashboard": True
}], indirect=True)
def test_logs(enable_test_module, disable_aiohttp_cache,
ray_start_cluster_head):
cluster = ray_start_cluster_head
assert (wait_until_server_available(cluster.webui_url) is True)
webui_url = cluster.webui_url
webui_url = format_web_url(webui_url)
nodes = ray.nodes()
assert len(nodes) == 1
node_ip = nodes[0]["NodeManagerAddress"]
@ray.remote
class LoggingActor:
def go(self, n):
i = 0
while i < n:
print(f"On number {i}")
i += 1
def get_pid(self):
return os.getpid()
la = LoggingActor.remote()
la2 = LoggingActor.remote()
la_pid = str(ray.get(la.get_pid.remote()))
la2_pid = str(ray.get(la2.get_pid.remote()))
ray.get(la.go.remote(4))
ray.get(la2.go.remote(1))
def check_logs():
node_logs_response = requests.get(
f"{webui_url}/node_logs", params={"ip": node_ip})
node_logs_response.raise_for_status()
node_logs = node_logs_response.json()
assert node_logs["result"]
assert type(node_logs["data"]["logs"]) is dict
assert all(
pid in node_logs["data"]["logs"] for pid in (la_pid, la2_pid))
assert len(node_logs["data"]["logs"][la2_pid]) == 1
actor_one_logs_response = requests.get(
f"{webui_url}/node_logs",
params={
"ip": node_ip,
"pid": str(la_pid)
})
actor_one_logs_response.raise_for_status()
actor_one_logs = actor_one_logs_response.json()
assert actor_one_logs["result"]
assert type(actor_one_logs["data"]["logs"]) is dict
assert len(actor_one_logs["data"]["logs"][la_pid]) == 4
wait_until_succeeded_without_exception(
check_logs, (AssertionError), timeout_ms=1000)
@pytest.mark.parametrize(
"ray_start_cluster_head", [{
"include_dashboard": True
}], indirect=True)
def test_errors(enable_test_module, disable_aiohttp_cache,
ray_start_cluster_head):
cluster = ray_start_cluster_head
assert (wait_until_server_available(cluster.webui_url) is True)
webui_url = cluster.webui_url
webui_url = format_web_url(webui_url)
nodes = ray.nodes()
assert len(nodes) == 1
node_ip = nodes[0]["NodeManagerAddress"]
@ray.remote
class ErrorActor():
def go(self):
raise ValueError("This is an error")
def get_pid(self):
return os.getpid()
ea = ErrorActor.remote()
ea_pid = ea.get_pid.remote()
ea.go.remote()
def check_errs():
node_errs_response = requests.get(
f"{webui_url}/node_logs", params={"ip": node_ip})
node_errs_response.raise_for_status()
node_errs = node_errs_response.json()
assert node_errs["result"]
assert type(node_errs["data"]["errors"]) is dict
assert ea_pid in node_errs["data"]["errors"]
assert len(node_errs["data"]["errors"][ea_pid]) == 1
actor_err_response = requests.get(
f"{webui_url}/node_logs",
params={
"ip": node_ip,
"pid": str(ea_pid)
})
actor_err_response.raise_for_status()
actor_errs = actor_err_response.json()
assert actor_errs["result"]
assert type(actor_errs["data"]["errors"]) is dict
assert len(actor_errs["data"]["errors"][ea_pid]) == 4
wait_until_succeeded_without_exception(
check_errs, (AssertionError), timeout_ms=1000)
if __name__ == "__main__":
sys.exit(pytest.main(["-v", __file__]))