From cc2f43c8266b9ad7a200065460dda81e901d0aca Mon Sep 17 00:00:00 2001 From: Max Fitton Date: Mon, 7 Dec 2020 21:41:13 -0800 Subject: [PATCH] [Dashboard][Bugfix] Fix bug in display of worker logs and errors in Dashboard (#12660) * Fix bug with worker logs/errors not displaying in the dashboard * Add error endpoint test. * lint --- .../stats_collector/stats_collector_head.py | 18 +-- .../tests/test_stats_collector.py | 119 +++++++++++++++++- 2 files changed, 124 insertions(+), 13 deletions(-) diff --git a/dashboard/modules/stats_collector/stats_collector_head.py b/dashboard/modules/stats_collector/stats_collector_head.py index c819f988c..1224b6d62 100644 --- a/dashboard/modules/stats_collector/stats_collector_head.py +++ b/dashboard/modules/stats_collector/stats_collector_head.py @@ -147,20 +147,22 @@ class StatsCollector(dashboard_utils.DashboardHeadModule): @routes.get("/node_logs") async def get_logs(self, req) -> aiohttp.web.Response: ip = req.query["ip"] - pid = req.query.get("pid") - node_logs = DataSource.ip_and_pid_to_logs[ip] - payload = node_logs.get(pid, []) if pid else node_logs + pid = str(req.query.get("pid", "")) + node_logs = DataSource.ip_and_pid_to_logs.get(ip, {}) + if pid: + node_logs = {str(pid): node_logs.get(pid, [])} return dashboard_utils.rest_response( - success=True, message="Fetched logs.", logs=payload) + success=True, message="Fetched logs.", logs=node_logs) @routes.get("/node_errors") async def get_errors(self, req) -> aiohttp.web.Response: ip = req.query["ip"] - pid = req.query.get("pid") - node_errors = DataSource.ip_and_pid_to_errors[ip] - filtered_errs = node_errors.get(pid, []) if pid else node_errors + pid = str(req.query.get("pid", "")) + node_errors = DataSource.ip_and_pid_to_errors.get(ip, {}) + if pid: + node_errors = {str(pid): node_errors.get(pid, [])} return dashboard_utils.rest_response( - success=True, message="Fetched errors.", errors=filtered_errs) + success=True, message="Fetched errors.", errors=node_errors) async def _update_actors(self): # Subscribe actor channel. diff --git a/dashboard/modules/stats_collector/tests/test_stats_collector.py b/dashboard/modules/stats_collector/tests/test_stats_collector.py index 5771e7e04..39fb3669c 100644 --- a/dashboard/modules/stats_collector/tests/test_stats_collector.py +++ b/dashboard/modules/stats_collector/tests/test_stats_collector.py @@ -8,11 +8,9 @@ import traceback import pytest import ray from ray.new_dashboard.tests.conftest import * # noqa -from ray.test_utils import ( - format_web_url, - wait_until_server_available, - wait_for_condition, -) +from ray.test_utils import (format_web_url, wait_until_server_available, + wait_for_condition, + wait_until_succeeded_without_exception) logger = logging.getLogger(__name__) @@ -219,5 +217,116 @@ def test_multi_nodes_info(enable_test_module, disable_aiohttp_cache, wait_for_condition(_check_nodes, timeout=10) +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "include_dashboard": True + }], indirect=True) +def test_logs(enable_test_module, disable_aiohttp_cache, + ray_start_cluster_head): + cluster = ray_start_cluster_head + assert (wait_until_server_available(cluster.webui_url) is True) + webui_url = cluster.webui_url + webui_url = format_web_url(webui_url) + nodes = ray.nodes() + assert len(nodes) == 1 + node_ip = nodes[0]["NodeManagerAddress"] + + @ray.remote + class LoggingActor: + def go(self, n): + i = 0 + while i < n: + print(f"On number {i}") + i += 1 + + def get_pid(self): + return os.getpid() + + la = LoggingActor.remote() + la2 = LoggingActor.remote() + la_pid = str(ray.get(la.get_pid.remote())) + la2_pid = str(ray.get(la2.get_pid.remote())) + ray.get(la.go.remote(4)) + ray.get(la2.go.remote(1)) + + def check_logs(): + node_logs_response = requests.get( + f"{webui_url}/node_logs", params={"ip": node_ip}) + node_logs_response.raise_for_status() + node_logs = node_logs_response.json() + assert node_logs["result"] + assert type(node_logs["data"]["logs"]) is dict + assert all( + pid in node_logs["data"]["logs"] for pid in (la_pid, la2_pid)) + assert len(node_logs["data"]["logs"][la2_pid]) == 1 + + actor_one_logs_response = requests.get( + f"{webui_url}/node_logs", + params={ + "ip": node_ip, + "pid": str(la_pid) + }) + actor_one_logs_response.raise_for_status() + actor_one_logs = actor_one_logs_response.json() + assert actor_one_logs["result"] + assert type(actor_one_logs["data"]["logs"]) is dict + assert len(actor_one_logs["data"]["logs"][la_pid]) == 4 + + wait_until_succeeded_without_exception( + check_logs, (AssertionError), timeout_ms=1000) + + +@pytest.mark.parametrize( + "ray_start_cluster_head", [{ + "include_dashboard": True + }], indirect=True) +def test_errors(enable_test_module, disable_aiohttp_cache, + ray_start_cluster_head): + cluster = ray_start_cluster_head + assert (wait_until_server_available(cluster.webui_url) is True) + webui_url = cluster.webui_url + webui_url = format_web_url(webui_url) + nodes = ray.nodes() + assert len(nodes) == 1 + node_ip = nodes[0]["NodeManagerAddress"] + + @ray.remote + class ErrorActor(): + def go(self): + raise ValueError("This is an error") + + def get_pid(self): + return os.getpid() + + ea = ErrorActor.remote() + ea_pid = ea.get_pid.remote() + ea.go.remote() + + def check_errs(): + node_errs_response = requests.get( + f"{webui_url}/node_logs", params={"ip": node_ip}) + node_errs_response.raise_for_status() + node_errs = node_errs_response.json() + assert node_errs["result"] + assert type(node_errs["data"]["errors"]) is dict + assert ea_pid in node_errs["data"]["errors"] + assert len(node_errs["data"]["errors"][ea_pid]) == 1 + + actor_err_response = requests.get( + f"{webui_url}/node_logs", + params={ + "ip": node_ip, + "pid": str(ea_pid) + }) + actor_err_response.raise_for_status() + actor_errs = actor_err_response.json() + assert actor_errs["result"] + assert type(actor_errs["data"]["errors"]) is dict + assert len(actor_errs["data"]["errors"][ea_pid]) == 4 + + wait_until_succeeded_without_exception( + check_errs, (AssertionError), timeout_ms=1000) + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__]))