From 2220a33b6235cf821a02ce3adb63282b99bd3cf0 Mon Sep 17 00:00:00 2001 From: Robert Nishihara Date: Sun, 19 Feb 2017 15:12:08 -0800 Subject: [PATCH] In UI, add timing information for tasks and show cluster scheduling. (#297) * In UI, add timing information for tasks and show cluster scheduling. * Factor out html generation as function. --- python/ray/worker.py | 11 ++++- webui/backend/ray_ui.py | 69 +++++++++++++++++++-------- webui/src/ray-recent-tasks.html | 19 +++----- webui/src/recent-tasks.js | 83 +++++++++++++++++++++++---------- 4 files changed, 124 insertions(+), 58 deletions(-) diff --git a/python/ray/worker.py b/python/ray/worker.py index 575965fb9..3b3b02d70 100644 --- a/python/ray/worker.py +++ b/python/ray/worker.py @@ -1203,12 +1203,19 @@ def connect(info, object_id_seed=None, mode=WORKER_MODE, worker=global_worker, a import __main__ as main driver_info = {"node_ip_address": worker.node_ip_address, "driver_id": worker.worker_id, - "start_time": time.time()} + "start_time": time.time(), + "plasma_store_socket": info["store_socket_name"], + "plasma_manager_socket": info["manager_socket_name"], + "local_scheduler_socket": info["local_scheduler_socket_name"]} driver_info["name"] = main.__file__ if hasattr(main, "__file__") else "INTERACTIVE MODE" worker.redis_client.hmset(b"Drivers:" + worker.worker_id, driver_info) elif mode == WORKER_MODE: # Register the worker with Redis. - worker.redis_client.hmset(b"Workers:" + worker.worker_id, {"node_ip_address": worker.node_ip_address}) + worker.redis_client.hmset(b"Workers:" + worker.worker_id, + {"node_ip_address": worker.node_ip_address, + "plasma_store_socket": info["store_socket_name"], + "plasma_manager_socket": info["manager_socket_name"], + "local_scheduler_socket": info["local_scheduler_socket_name"]}) else: raise Exception("This code should be unreachable.") # If this is a driver, set the current task ID, the task driver ID, and set diff --git a/webui/backend/ray_ui.py b/webui/backend/ray_ui.py index 852bb7f19..dd1241723 100644 --- a/webui/backend/ray_ui.py +++ b/webui/backend/ray_ui.py @@ -2,7 +2,7 @@ import aioredis import argparse import asyncio import binascii -from collections import defaultdict +import collections import datetime import json import numpy as np @@ -43,7 +43,10 @@ def key_to_hex_identifiers(key): task_id = hex_identifier(key[offset:(offset + IDENTIFIER_LENGTH)]) return worker_id, task_id -worker_ids = [] +async def hgetall_as_dict(redis_conn, key): + fields = await redis_conn.execute("hgetall", key) + return {fields[2 * i]: fields[2 * i + 1] for i in range(len(fields) // 2)} + # Cache information about the local schedulers. local_schedulers = {} @@ -66,8 +69,10 @@ def duration_to_string(duration): duration_str = "{0:0.1f} minutes".format(duration / 60) elif duration > 1: duration_str = "{0:0.1f} seconds".format(duration) + elif duration > 0.001: + duration_str = "{0:0.1f} milliseconds".format(duration * 1000) else: - duration_str = "{} milliseconds".format(int(duration * 1000)) + duration_str = "{} microseconds".format(int(duration * 1000000)) return duration_str async def handle_get_statistics(websocket, redis_conn): @@ -79,8 +84,7 @@ async def handle_get_statistics(websocket, redis_conn): client_keys = await redis_conn.execute("keys", "CL:*") clients = [] for client_key in client_keys: - client_fields = await redis_conn.execute("hgetall", client_key) - client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)} + client_fields = await hgetall_as_dict(redis_conn, client_key) clients.append(client_fields) ip_addresses = list(set([client[b"node_ip_address"].decode("ascii") for client in clients if client[b"client_type"] == b"photon"])) num_nodes = len(ip_addresses) @@ -94,8 +98,7 @@ async def handle_get_drivers(websocket, redis_conn): keys = await redis_conn.execute("keys", "Drivers:*") drivers = [] for key in keys: - driver_fields = await redis_conn.execute("hgetall", key) - driver_fields = {driver_fields[2 * i]: driver_fields[2 * i + 1] for i in range(len(driver_fields) // 2)} + driver_fields = await hgetall_as_dict(redis_conn, key) driver_info = {"node ip address": driver_fields[b"node_ip_address"].decode("ascii"), "name": driver_fields[b"name"].decode("ascii")} @@ -122,7 +125,21 @@ async def handle_get_drivers(websocket, redis_conn): reply = sorted(drivers, key=(lambda driver: driver["start time"]))[::-1] await websocket.send(json.dumps(reply)) +node_info = collections.OrderedDict() +worker_info = collections.OrderedDict() + async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): + # First update the cache of worker information. + worker_keys = await redis_conn.execute("keys", "Workers:*") + for key in worker_keys: + worker_id = hex_identifier(key[len("Workers:"):]) + if worker_id not in worker_info: + worker_info[worker_id] = await hgetall_as_dict(redis_conn, key) + node_ip_address = worker_info[worker_id][b"node_ip_address"].decode("ascii") + if node_ip_address not in node_info: + node_info[node_ip_address] = {"workers": []} + node_info[node_ip_address]["workers"].append(worker_id) + keys = await redis_conn.execute("keys", "event_log:*") if len(keys) == 0: # There are no tasks, so send a message to the client saying so. @@ -142,13 +159,21 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): min_time = time_cutoff - (max_time - time_cutoff) * 0.1 max_time = max_time + (max_time - time_cutoff) * 0.1 - task_data = [] + worker_ids = list(worker_info.keys()) + node_ip_addresses = list(node_info.keys()) + + num_tasks = 0 + task_data = [{"task_data": [], + "num_workers": len(node_info[node_ip_address]["workers"])} for node_ip_address in node_ip_addresses] for i in range(len(keys)): worker_id, task_id = key_to_hex_identifiers(keys[i]) data = contents[i] if worker_id not in worker_ids: - worker_ids.append(worker_id) - worker_index = worker_ids.index(worker_id) + # This case should be extremely rare. + raise Exception("A worker ID was not present in the list of worker IDs.") + node_ip_address = worker_info[worker_id][b"node_ip_address"].decode("ascii") + worker_index = node_info[node_ip_address]["workers"].index(worker_id) + node_index = node_ip_addresses.index(node_ip_address) task_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task"] if task_times[1] <= time_cutoff: @@ -157,14 +182,21 @@ async def handle_get_recent_tasks(websocket, redis_conn, num_tasks): task_get_arguments_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:get_arguments"] task_execute_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:execute"] task_store_outputs_times = [timestamp for (timestamp, task, kind, info) in data if task == "ray:task:store_outputs"] - task_data.append({"task": task_times, - "get_arguments": task_get_arguments_times, - "execute": task_execute_times, - "store_outputs": task_store_outputs_times, - "worker_index": worker_index}) + task_data[node_index]["task_data"].append( + {"task": task_times, + "get_arguments": task_get_arguments_times, + "execute": task_execute_times, + "store_outputs": task_store_outputs_times, + "worker_index": worker_index, + "node_ip_address": node_ip_address, + "task_formatted_time": duration_to_string(task_times[1] - task_times[0]), + "get_arguments_formatted_time": duration_to_string(task_get_arguments_times[1] - task_get_arguments_times[0]), + "execute_formatted_time": duration_to_string(task_execute_times[1] - task_execute_times[0]), + "store_outputs_formatted_time": duration_to_string(task_store_outputs_times[1] - task_store_outputs_times[0])}) + num_tasks += 1 reply = {"min_time": min_time, "max_time": max_time, - "num_tasks": len(task_data), + "num_tasks": num_tasks, "task_data": task_data} await websocket.send(json.dumps(reply)) @@ -189,8 +221,7 @@ async def send_heartbeats(websocket, redis_conn): client_keys = await redis_conn.execute("keys", "CL:*") clients = [] for client_key in client_keys: - client_fields = await redis_conn.execute("hgetall", client_key) - client_fields = {client_fields[2 * i]: client_fields[2 * i + 1] for i in range(len(client_fields) // 2)} + client_fields = await hgetall_as_dict(redis_conn, client_key) if client_fields[b"client_type"] == b"photon": local_scheduler_id = hex_identifier(client_fields[b"ray_client_id"]) local_schedulers[local_scheduler_id] = {"node_ip_address": client_fields[b"node_ip_address"].decode("ascii"), @@ -275,7 +306,7 @@ async def serve_requests(websocket, path): "node_id": hex_identifier(content[3])}) await websocket.send(json.dumps(result)) elif command["command"] == "get-timeline": - tasks = defaultdict(list) + tasks = collections.defaultdict(list) for key in await redis_conn.execute("keys", "event_log:*"): worker_id, task_id = key_to_hex_identifiers(key) content = await redis_conn.execute("lrange", key, "0", "-1") diff --git a/webui/src/ray-recent-tasks.html b/webui/src/ray-recent-tasks.html index 10a86ff87..7ae100f89 100644 --- a/webui/src/ray-recent-tasks.html +++ b/webui/src/ray-recent-tasks.html @@ -23,18 +23,15 @@

- + +
+ diff --git a/webui/src/recent-tasks.js b/webui/src/recent-tasks.js index 2f5e87d8d..443847d62 100644 --- a/webui/src/recent-tasks.js +++ b/webui/src/recent-tasks.js @@ -1,74 +1,107 @@ -RecentTasks = function(elem, options) { +RecentTasks = function(all_recent_tasks_elem) { var self = this; - this.options = options; + var verticalPadding = 10; var barHeight = 25; - var svg = d3.select(elem) - .attr("width", this.options.width) - .attr("height", this.options.height); + var all_recent_tasks_div = d3.select(all_recent_tasks_elem); + + this.generate_task_info = function(d) { + return "
Total Time: " + d.task_formatted_time + "
" + + "
Time Getting Arguments: " + d.get_arguments_formatted_time + "
" + + "
Time in Execution: " + d.execute_formatted_time + "
" + + "
Time Storing Outputs: " + d.store_outputs_formatted_time + "
"; + } + + this.draw_new_node_tasks = function(all_task_info, task_info, width, svg, info) { + var height = task_info.num_workers * barHeight + 2 * verticalPadding; + + svg.attr("width", width) + .attr("height", height); + + var borderPath = svg.append("rect") + .attr("x", 0) + .attr("y", 0) + .attr("height", height) + .attr("width", width) + .style("stroke", "black") + .style("fill", "none") + .style("stroke-width", 1); - this.draw_new_tasks = function(task_info) { - this.task_info = task_info; var x = d3.scaleLinear() - .domain([task_info.min_time, task_info.max_time]) + .domain([all_task_info.min_time, all_task_info.max_time]) .range([-1, width + 1]); - var xAxis = d3.axisBottom(x) - .tickSize(-height); - - var gx = svg.append("g") - .attr("class", "axis axis--x") - .attr("transform", "translate(0," + (height - 10) + ")") - .call(xAxis); - var task_rects = svg.append("g").attr("class", "task_rects"); var get_arguments_rects = svg.append("g").attr("class", "get_arguments_rects"); var execute_rects = svg.append("g").attr("class", "execute_rects"); var store_outputs_rects = svg.append("g").attr("class", "store_outputs_rects"); task_rects.selectAll("rect") - .data(this.task_info.task_data) + .data(task_info.task_data) .enter() .append("rect") .attr("x", function (d) { return x(d.task[0]); }) - .attr("y", function (d) { return (d.worker_index + 1) * barHeight; }) + .attr("y", function (d) { return verticalPadding + d.worker_index * barHeight; }) .attr("width", function (d) { return x(d.task[1]) - x(d.task[0]); }) .attr("height", function (d) { return barHeight - 1; }) .attr("fill", "orange") + .attr("id", function (d) { d.store_outputs[1]; }) + .on("click", function(d, i) { + info.html(self.generate_task_info(d)); + }) get_arguments_rects.selectAll("rect") - .data(this.task_info.task_data) + .data(task_info.task_data) .enter() .append("rect") .attr("x", function (d) { return x(d.get_arguments[0]); }) - .attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; }) + .attr("y", function (d) { return verticalPadding + d.worker_index * barHeight + 1; }) .attr("width", function (d) { return x(d.get_arguments[1]) - x(d.get_arguments[0]); }) .attr("height", function (d) { return barHeight - 3; }) .attr("fill", "black") + .on("click", function(d, i) { + info.html(self.generate_task_info(d)); + }) execute_rects.selectAll("rect") - .data(this.task_info.task_data) + .data(task_info.task_data) .enter() .append("rect") .attr("x", function (d) { return x(d.execute[0]); }) - .attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; }) + .attr("y", function (d) { return verticalPadding + d.worker_index * barHeight + 1; }) .attr("width", function (d) { return x(d.execute[1]) - x(d.execute[0]); }) .attr("height", function (d) { return barHeight - 3; }) .attr("fill", "blue") + .on("click", function(d, i) { + info.html(self.generate_task_info(d)); + }) store_outputs_rects.selectAll("rect") - .data(this.task_info.task_data) + .data(task_info.task_data) .enter() .append("rect") .attr("x", function (d) { return x(d.store_outputs[0]); }) - .attr("y", function (d) { return (d.worker_index + 1) * barHeight + 1; }) + .attr("y", function (d) { return verticalPadding + d.worker_index * barHeight + 1; }) .attr("width", function (d) { return x(d.store_outputs[1]) - x(d.store_outputs[0]); }) .attr("height", function (d) { return barHeight - 3; }) .attr("fill", "green") + .on("click", function(d, i) { + info.html(self.generate_task_info(d)); + }) + } + + this.draw_new_tasks = function(all_task_info, width) { + // Call draw_new_node_tasks once for each node. + for (i = 0; i < all_task_info.task_data.length; i++) { + var new_svg = all_recent_tasks_div.append("svg"); + var info = all_recent_tasks_div.append("div"); + this.draw_new_node_tasks(all_task_info, all_task_info.task_data[i], width, new_svg, info); + } } this.erase = function() { - svg.selectAll("g").remove() + all_recent_tasks_div.selectAll("svg").remove(); + all_recent_tasks_div.selectAll("div").remove(); } }