From f545418c3f3348866c912072a12b607af497518e Mon Sep 17 00:00:00 2001 From: Max Fitton Date: Wed, 11 Nov 2020 14:55:54 -0800 Subject: [PATCH] [Dashboard] Fix dashboard regression caused by logCount and errCount being removed from worker payload (#11954) --- dashboard/datacenter.py | 8 ++++ .../tests/test_stats_collector.py | 42 +++++++++++++------ 2 files changed, 37 insertions(+), 13 deletions(-) diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index edf0e1545..cb699a39f 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -93,6 +93,11 @@ class DataOrganizer: @classmethod async def get_node_workers(cls, node_id): workers = [] + node_ip = DataSource.node_id_to_ip[node_id] + node_logs = DataSource.ip_and_pid_to_logs.get(node_ip, {}) + logger.error(node_logs) + node_errs = DataSource.ip_and_pid_to_errors.get(node_ip, {}) + logger.error(node_errs) node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) node_stats = DataSource.node_stats.get(node_id, {}) # Merge coreWorkerStats (node stats) to workers (node physical stats) @@ -107,6 +112,9 @@ class DataOrganizer: for worker in node_physical_stats.get("workers", []): worker = dict(worker) pid = worker["pid"] + logger.error(f"pid={pid}") + worker["logCount"] = len(node_logs.get(str(pid), [])) + worker["errorCount"] = len(node_errs.get(str(pid), [])) worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, []) worker["language"] = pid_to_language.get( pid, dashboard_consts.DEFAULT_LANGUAGE) diff --git a/dashboard/modules/stats_collector/tests/test_stats_collector.py b/dashboard/modules/stats_collector/tests/test_stats_collector.py index 06824656f..5e8087473 100644 --- a/dashboard/modules/stats_collector/tests/test_stats_collector.py +++ b/dashboard/modules/stats_collector/tests/test_stats_collector.py @@ -137,31 +137,47 @@ def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard): @ray.remote class ActorWithObjs: def __init__(self): + print("I also log a line") self.obj_ref = ray.put([1, 2, 3]) def get_obj(self): return ray.get(self.obj_ref) actors = [ActorWithObjs.remote() for _ in range(2)] # noqa + timeout_seconds = 20 + start_time = time.time() + last_ex = None def check_node_details(): resp = requests.get(f"{webui_url}/nodes?view=details") resp_json = resp.json() resp_data = resp_json["data"] - try: - clients = resp_data["clients"] - node = clients[0] - assert len(clients) == 1 - assert len(node.get("actors")) == 2 - # Workers information should be in the detailed payload - assert "workers" in node - assert "logCount" in node - assert len(node["workers"]) == 2 - return True - except (AssertionError, KeyError, IndexError): - return False + clients = resp_data["clients"] + node = clients[0] + assert len(clients) == 1 + assert len(node.get("actors")) == 2 + # Workers information should be in the detailed payload + assert "workers" in node + assert "logCount" in node + assert node["logCount"] == 2 + print(node["workers"]) + assert len(node["workers"]) == 2 + assert node["workers"][0]["logCount"] == 1 - wait_for_condition(check_node_details, 15) + while True: + time.sleep(1) + try: + check_node_details() + break + except (AssertionError, KeyError, IndexError) as ex: + last_ex = ex + finally: + if time.time() > start_time + timeout_seconds: + ex_stack = traceback.format_exception( + type(last_ex), last_ex, + last_ex.__traceback__) if last_ex else [] + ex_stack = "".join(ex_stack) + raise Exception(f"Timed out while testing, {ex_stack}") @pytest.mark.parametrize(