mirror of
https://github.com/wassname/ray.git
synced 2026-06-27 21:38:18 +08:00
Cluster heat map. (#792)
This commit is contained in:
committed by
Robert Nishihara
parent
a2852f2329
commit
dfcd399dbb
+142
-10
@@ -505,19 +505,21 @@
|
||||
" if \"NumCPUs\" in client: \n",
|
||||
" num_cpus += client[\"NumCPUs\"]\n",
|
||||
"\n",
|
||||
"def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets):\n",
|
||||
" # Determine what the earliest and latest tasks are out of the ones that are passed in\n",
|
||||
" earliest_time = time.time()\n",
|
||||
" latest_time = 0\n",
|
||||
"def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets, use_abs_times=False):\n",
|
||||
" \n",
|
||||
" if len(tasks) == 0:\n",
|
||||
" return [], [], []\n",
|
||||
" \n",
|
||||
" sum_len = 0\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" latest_time = max((latest_time, data[\"store_outputs_end\"]))\n",
|
||||
" earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n",
|
||||
" sum_len += data[\"store_outputs_end\"] - data[\"get_arguments_start\"]\n",
|
||||
" if use_abs_times:\n",
|
||||
" earliest_time = abs_earliest\n",
|
||||
" latest_time = abs_latest\n",
|
||||
" else:\n",
|
||||
" # Determine what the earliest and latest tasks are out of the ones that are passed in\n",
|
||||
" earliest_time = time.time()\n",
|
||||
" latest_time = 0\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" latest_time = max((latest_time, data[\"store_outputs_end\"]))\n",
|
||||
" earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n",
|
||||
" \n",
|
||||
" # Add some epsilon to latest_time to ensure that the end time of the last task\n",
|
||||
" # falls __within__ a bucket, and not on the edge\n",
|
||||
@@ -543,7 +545,8 @@
|
||||
" task_end_time_within_bucket = min(task_end_time, bucket_end_time)\n",
|
||||
" task_cpu_time_within_bucket = task_end_time_within_bucket - task_start_time_within_bucket\n",
|
||||
" \n",
|
||||
" cpu_time[bucket_idx] += task_cpu_time_within_bucket\n",
|
||||
" if bucket_idx > -1 and bucket_idx < num_buckets: \n",
|
||||
" cpu_time[bucket_idx] += task_cpu_time_within_bucket\n",
|
||||
" \n",
|
||||
" # Cpu_utilization is the average cpu utilization of the bucket, which is just\n",
|
||||
" # cpu_time divided by bucket_time_length\n",
|
||||
@@ -608,6 +611,135 @@
|
||||
"\n",
|
||||
"plot_utilization()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Cluster usage over time \"heat map\"."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from bokeh.io import show, output_notebook, push_notebook\n",
|
||||
"from bokeh.resources import CDN\n",
|
||||
"from bokeh.plotting import figure, helpers\n",
|
||||
"from bokeh.models import (\n",
|
||||
" ColumnDataSource,\n",
|
||||
" HoverTool,\n",
|
||||
" LinearColorMapper,\n",
|
||||
" BasicTicker,\n",
|
||||
" PrintfTickFormatter,\n",
|
||||
" ColorBar,\n",
|
||||
")\n",
|
||||
"output_notebook(resources=CDN)\n",
|
||||
"\n",
|
||||
"# Function to create the cluster usage \"heat map\" \n",
|
||||
"def cluster_usage():\n",
|
||||
" \n",
|
||||
" # Initial values \n",
|
||||
" source = ColumnDataSource(data={\"node_ip_address\":['127.0.0.1'], \"time\":['0.5'], \"num_tasks\":['1'], \"length\": [1]})\n",
|
||||
"\n",
|
||||
" # Define the color schema \n",
|
||||
" colors = [\"#75968f\", \"#a5bab7\", \"#c9d9d3\", \"#e2e2e2\", \"#dfccce\", \"#ddb7b1\", \"#cc7878\", \"#933b41\", \"#550b1d\"]\n",
|
||||
" mapper = LinearColorMapper(palette=colors, low=0, high=2)\n",
|
||||
"\n",
|
||||
" TOOLS = \"hover, save, xpan, box_zoom, reset, xwheel_zoom\"\n",
|
||||
" \n",
|
||||
" # Create the plot \n",
|
||||
" p = figure(title=\"Cluster Usage\", y_range=list(set(source.data['node_ip_address'])),\n",
|
||||
" x_axis_location=\"above\", plot_width=900, plot_height=500,\n",
|
||||
" tools=TOOLS, toolbar_location='below')\n",
|
||||
"\n",
|
||||
" # Format the plot axes \n",
|
||||
" p.grid.grid_line_color = None\n",
|
||||
" p.axis.axis_line_color = None\n",
|
||||
" p.axis.major_tick_line_color = None\n",
|
||||
" p.axis.major_label_text_font_size = \"10pt\"\n",
|
||||
" p.axis.major_label_standoff = 0\n",
|
||||
" p.xaxis.major_label_orientation = np.pi / 3\n",
|
||||
"\n",
|
||||
" # Plot rectangles\n",
|
||||
" p.rect(x=\"time\", y=\"node_ip_address\", width=\"length\", height=1,\n",
|
||||
" source=source,\n",
|
||||
" fill_color={\"field\": \"num_tasks\", \"transform\": mapper},\n",
|
||||
" line_color=None)\n",
|
||||
"\n",
|
||||
" # Add legend to the side of the plot\n",
|
||||
" color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size=\"8pt\",\n",
|
||||
" ticker=BasicTicker(desired_num_ticks=len(colors)),\n",
|
||||
" label_standoff=6, border_line_color=None, location=(0, 0))\n",
|
||||
" p.add_layout(color_bar, \"right\")\n",
|
||||
"\n",
|
||||
" # Define hover tool\n",
|
||||
" p.select_one(HoverTool).tooltips = [\n",
|
||||
" (\"Node IP Address\", \"@node_ip_address\"),\n",
|
||||
" (\"Number of tasks running\", \"@num_tasks\"),\n",
|
||||
" (\"Time\", \"@time\")\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
" # Define the axis labels \n",
|
||||
" p.xaxis.axis_label = \"Time in seconds\"\n",
|
||||
" p.yaxis.axis_label = \"Node IP Address\"\n",
|
||||
" handle = show(p, notebook_handle=True)\n",
|
||||
" workers = ray.global_state.workers()\n",
|
||||
" # Function to update the heat map \n",
|
||||
" def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n",
|
||||
" if len(tasks) == 0:\n",
|
||||
" return\n",
|
||||
" \n",
|
||||
" granularity = 1\n",
|
||||
" earliest = time.time()\n",
|
||||
" latest = 0\n",
|
||||
" \n",
|
||||
" node_to_tasks = dict()\n",
|
||||
" # Determine which task has the earlest start time out of the ones passed into the \n",
|
||||
" # update function\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" if data[\"score\"] > latest:\n",
|
||||
" latest = data[\"score\"]\n",
|
||||
" if data[\"score\"] < earliest:\n",
|
||||
" earliest = data[\"score\"]\n",
|
||||
" worker_id = data[\"worker_id\"]\n",
|
||||
" node_ip = workers[worker_id][\"node_ip_address\"]\n",
|
||||
" if node_ip not in node_to_tasks: \n",
|
||||
" node_to_tasks[node_ip] = {}\n",
|
||||
" node_to_tasks[node_ip][task_id] = data\n",
|
||||
" \n",
|
||||
" nodes = []\n",
|
||||
" times = []\n",
|
||||
" lengths = []\n",
|
||||
" num_tasks = []\n",
|
||||
" \n",
|
||||
" for node_ip, task_dict in node_to_tasks.items():\n",
|
||||
" left, right, top = compute_utilizations(earliest, latest, abs_num_tasks, task_dict, 100, True)\n",
|
||||
" for (l, r, t) in zip(left, right, top):\n",
|
||||
" nodes.append(node_ip)\n",
|
||||
" times.append((l + r) / 2)\n",
|
||||
" lengths.append(r - l)\n",
|
||||
" num_tasks.append(t)\n",
|
||||
"\n",
|
||||
" # Set the y range of the plot to be the node IP addresses \n",
|
||||
" p.y_range.factors = list(set(nodes))\n",
|
||||
" \n",
|
||||
" mapper.low = min(min(num_tasks), 0)\n",
|
||||
" mapper.high = max(max(num_tasks), 1)\n",
|
||||
"\n",
|
||||
" # Update plot with new data based on slider and text box values \n",
|
||||
" source.data = {\"node_ip_address\": nodes, \"time\": times, \"num_tasks\": num_tasks, \"length\": lengths}\n",
|
||||
" \n",
|
||||
" push_notebook(handle=handle)\n",
|
||||
"\n",
|
||||
" get_sliders(heat_map_update)\n",
|
||||
" \n",
|
||||
"cluster_usage()"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
|
||||
Reference in New Issue
Block a user