mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 18:06:25 +08:00
[Dashboard] Fix dashboard regression caused by logCount and errCount being removed from worker payload (#11954)
This commit is contained in:
@@ -93,6 +93,11 @@ class DataOrganizer:
|
||||
@classmethod
|
||||
async def get_node_workers(cls, node_id):
|
||||
workers = []
|
||||
node_ip = DataSource.node_id_to_ip[node_id]
|
||||
node_logs = DataSource.ip_and_pid_to_logs.get(node_ip, {})
|
||||
logger.error(node_logs)
|
||||
node_errs = DataSource.ip_and_pid_to_errors.get(node_ip, {})
|
||||
logger.error(node_errs)
|
||||
node_physical_stats = DataSource.node_physical_stats.get(node_id, {})
|
||||
node_stats = DataSource.node_stats.get(node_id, {})
|
||||
# Merge coreWorkerStats (node stats) to workers (node physical stats)
|
||||
@@ -107,6 +112,9 @@ class DataOrganizer:
|
||||
for worker in node_physical_stats.get("workers", []):
|
||||
worker = dict(worker)
|
||||
pid = worker["pid"]
|
||||
logger.error(f"pid={pid}")
|
||||
worker["logCount"] = len(node_logs.get(str(pid), []))
|
||||
worker["errorCount"] = len(node_errs.get(str(pid), []))
|
||||
worker["coreWorkerStats"] = pid_to_worker_stats.get(pid, [])
|
||||
worker["language"] = pid_to_language.get(
|
||||
pid, dashboard_consts.DEFAULT_LANGUAGE)
|
||||
|
||||
@@ -137,31 +137,47 @@ def test_get_all_node_details(disable_aiohttp_cache, ray_start_with_dashboard):
|
||||
@ray.remote
|
||||
class ActorWithObjs:
|
||||
def __init__(self):
|
||||
print("I also log a line")
|
||||
self.obj_ref = ray.put([1, 2, 3])
|
||||
|
||||
def get_obj(self):
|
||||
return ray.get(self.obj_ref)
|
||||
|
||||
actors = [ActorWithObjs.remote() for _ in range(2)] # noqa
|
||||
timeout_seconds = 20
|
||||
start_time = time.time()
|
||||
last_ex = None
|
||||
|
||||
def check_node_details():
|
||||
resp = requests.get(f"{webui_url}/nodes?view=details")
|
||||
resp_json = resp.json()
|
||||
resp_data = resp_json["data"]
|
||||
try:
|
||||
clients = resp_data["clients"]
|
||||
node = clients[0]
|
||||
assert len(clients) == 1
|
||||
assert len(node.get("actors")) == 2
|
||||
# Workers information should be in the detailed payload
|
||||
assert "workers" in node
|
||||
assert "logCount" in node
|
||||
assert len(node["workers"]) == 2
|
||||
return True
|
||||
except (AssertionError, KeyError, IndexError):
|
||||
return False
|
||||
clients = resp_data["clients"]
|
||||
node = clients[0]
|
||||
assert len(clients) == 1
|
||||
assert len(node.get("actors")) == 2
|
||||
# Workers information should be in the detailed payload
|
||||
assert "workers" in node
|
||||
assert "logCount" in node
|
||||
assert node["logCount"] == 2
|
||||
print(node["workers"])
|
||||
assert len(node["workers"]) == 2
|
||||
assert node["workers"][0]["logCount"] == 1
|
||||
|
||||
wait_for_condition(check_node_details, 15)
|
||||
while True:
|
||||
time.sleep(1)
|
||||
try:
|
||||
check_node_details()
|
||||
break
|
||||
except (AssertionError, KeyError, IndexError) as ex:
|
||||
last_ex = ex
|
||||
finally:
|
||||
if time.time() > start_time + timeout_seconds:
|
||||
ex_stack = traceback.format_exception(
|
||||
type(last_ex), last_ex,
|
||||
last_ex.__traceback__) if last_ex else []
|
||||
ex_stack = "".join(ex_stack)
|
||||
raise Exception(f"Timed out while testing, {ex_stack}")
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
Reference in New Issue
Block a user