From dfcd399dbbf8c1f61050c2bfea60fd2b326f69b0 Mon Sep 17 00:00:00 2001 From: alanamarzoev Date: Mon, 31 Jul 2017 20:49:31 -0700 Subject: [PATCH] Cluster heat map. (#792) --- python/ray/WebUI.ipynb | 152 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 142 insertions(+), 10 deletions(-) diff --git a/python/ray/WebUI.ipynb b/python/ray/WebUI.ipynb index 9fd7831b9..f3714a31c 100644 --- a/python/ray/WebUI.ipynb +++ b/python/ray/WebUI.ipynb @@ -505,19 +505,21 @@ " if \"NumCPUs\" in client: \n", " num_cpus += client[\"NumCPUs\"]\n", "\n", - "def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets):\n", - " # Determine what the earliest and latest tasks are out of the ones that are passed in\n", - " earliest_time = time.time()\n", - " latest_time = 0\n", + "def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets, use_abs_times=False):\n", " \n", " if len(tasks) == 0:\n", " return [], [], []\n", " \n", - " sum_len = 0\n", - " for task_id, data in tasks.items():\n", - " latest_time = max((latest_time, data[\"store_outputs_end\"]))\n", - " earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n", - " sum_len += data[\"store_outputs_end\"] - data[\"get_arguments_start\"]\n", + " if use_abs_times:\n", + " earliest_time = abs_earliest\n", + " latest_time = abs_latest\n", + " else:\n", + " # Determine what the earliest and latest tasks are out of the ones that are passed in\n", + " earliest_time = time.time()\n", + " latest_time = 0\n", + " for task_id, data in tasks.items():\n", + " latest_time = max((latest_time, data[\"store_outputs_end\"]))\n", + " earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n", " \n", " # Add some epsilon to latest_time to ensure that the end time of the last task\n", " # falls __within__ a bucket, and not on the edge\n", @@ -543,7 +545,8 @@ " task_end_time_within_bucket = min(task_end_time, bucket_end_time)\n", " task_cpu_time_within_bucket = task_end_time_within_bucket - task_start_time_within_bucket\n", " \n", - " cpu_time[bucket_idx] += task_cpu_time_within_bucket\n", + " if bucket_idx > -1 and bucket_idx < num_buckets: \n", + " cpu_time[bucket_idx] += task_cpu_time_within_bucket\n", " \n", " # Cpu_utilization is the average cpu utilization of the bucket, which is just\n", " # cpu_time divided by bucket_time_length\n", @@ -608,6 +611,135 @@ "\n", "plot_utilization()" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Cluster usage over time \"heat map\"." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "from bokeh.io import show, output_notebook, push_notebook\n", + "from bokeh.resources import CDN\n", + "from bokeh.plotting import figure, helpers\n", + "from bokeh.models import (\n", + " ColumnDataSource,\n", + " HoverTool,\n", + " LinearColorMapper,\n", + " BasicTicker,\n", + " PrintfTickFormatter,\n", + " ColorBar,\n", + ")\n", + "output_notebook(resources=CDN)\n", + "\n", + "# Function to create the cluster usage \"heat map\" \n", + "def cluster_usage():\n", + " \n", + " # Initial values \n", + " source = ColumnDataSource(data={\"node_ip_address\":['127.0.0.1'], \"time\":['0.5'], \"num_tasks\":['1'], \"length\": [1]})\n", + "\n", + " # Define the color schema \n", + " colors = [\"#75968f\", \"#a5bab7\", \"#c9d9d3\", \"#e2e2e2\", \"#dfccce\", \"#ddb7b1\", \"#cc7878\", \"#933b41\", \"#550b1d\"]\n", + " mapper = LinearColorMapper(palette=colors, low=0, high=2)\n", + "\n", + " TOOLS = \"hover, save, xpan, box_zoom, reset, xwheel_zoom\"\n", + " \n", + " # Create the plot \n", + " p = figure(title=\"Cluster Usage\", y_range=list(set(source.data['node_ip_address'])),\n", + " x_axis_location=\"above\", plot_width=900, plot_height=500,\n", + " tools=TOOLS, toolbar_location='below')\n", + "\n", + " # Format the plot axes \n", + " p.grid.grid_line_color = None\n", + " p.axis.axis_line_color = None\n", + " p.axis.major_tick_line_color = None\n", + " p.axis.major_label_text_font_size = \"10pt\"\n", + " p.axis.major_label_standoff = 0\n", + " p.xaxis.major_label_orientation = np.pi / 3\n", + "\n", + " # Plot rectangles\n", + " p.rect(x=\"time\", y=\"node_ip_address\", width=\"length\", height=1,\n", + " source=source,\n", + " fill_color={\"field\": \"num_tasks\", \"transform\": mapper},\n", + " line_color=None)\n", + "\n", + " # Add legend to the side of the plot\n", + " color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size=\"8pt\",\n", + " ticker=BasicTicker(desired_num_ticks=len(colors)),\n", + " label_standoff=6, border_line_color=None, location=(0, 0))\n", + " p.add_layout(color_bar, \"right\")\n", + "\n", + " # Define hover tool\n", + " p.select_one(HoverTool).tooltips = [\n", + " (\"Node IP Address\", \"@node_ip_address\"),\n", + " (\"Number of tasks running\", \"@num_tasks\"),\n", + " (\"Time\", \"@time\")\n", + " ]\n", + "\n", + " # Define the axis labels \n", + " p.xaxis.axis_label = \"Time in seconds\"\n", + " p.yaxis.axis_label = \"Node IP Address\"\n", + " handle = show(p, notebook_handle=True)\n", + " workers = ray.global_state.workers()\n", + " # Function to update the heat map \n", + " def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n", + " if len(tasks) == 0:\n", + " return\n", + " \n", + " granularity = 1\n", + " earliest = time.time()\n", + " latest = 0\n", + " \n", + " node_to_tasks = dict()\n", + " # Determine which task has the earlest start time out of the ones passed into the \n", + " # update function\n", + " for task_id, data in tasks.items():\n", + " if data[\"score\"] > latest:\n", + " latest = data[\"score\"]\n", + " if data[\"score\"] < earliest:\n", + " earliest = data[\"score\"]\n", + " worker_id = data[\"worker_id\"]\n", + " node_ip = workers[worker_id][\"node_ip_address\"]\n", + " if node_ip not in node_to_tasks: \n", + " node_to_tasks[node_ip] = {}\n", + " node_to_tasks[node_ip][task_id] = data\n", + " \n", + " nodes = []\n", + " times = []\n", + " lengths = []\n", + " num_tasks = []\n", + " \n", + " for node_ip, task_dict in node_to_tasks.items():\n", + " left, right, top = compute_utilizations(earliest, latest, abs_num_tasks, task_dict, 100, True)\n", + " for (l, r, t) in zip(left, right, top):\n", + " nodes.append(node_ip)\n", + " times.append((l + r) / 2)\n", + " lengths.append(r - l)\n", + " num_tasks.append(t)\n", + "\n", + " # Set the y range of the plot to be the node IP addresses \n", + " p.y_range.factors = list(set(nodes))\n", + " \n", + " mapper.low = min(min(num_tasks), 0)\n", + " mapper.high = max(max(num_tasks), 1)\n", + "\n", + " # Update plot with new data based on slider and text box values \n", + " source.data = {\"node_ip_address\": nodes, \"time\": times, \"num_tasks\": num_tasks, \"length\": lengths}\n", + " \n", + " push_notebook(handle=handle)\n", + "\n", + " get_sliders(heat_map_update)\n", + " \n", + "cluster_usage()" + ] } ], "metadata": {