mirror of
https://github.com/wassname/ray.git
synced 2026-07-01 20:57:51 +08:00
UI functions in separate file. (#801)
* UI file. * Fixed linting. * Change UI instructions slightly.
This commit is contained in:
committed by
Robert Nishihara
parent
054ae4180e
commit
99badc7ae4
+36
-659
@@ -4,741 +4,118 @@
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"### Ray UI.\n",
|
||||
"# Ray UI\n",
|
||||
"\n",
|
||||
"#### To run the UI: \n",
|
||||
"1. Kernel -> Restart and Run All \n",
|
||||
"\n",
|
||||
"\n",
|
||||
"#### To hide the code:\n",
|
||||
"\n",
|
||||
"1. Install the hide_code extension.\n",
|
||||
"```\n",
|
||||
"pip install hide_code\n",
|
||||
"jupyter nbextension install --py hide_code\n",
|
||||
"jupyter nbextension enable --py hide_code\n",
|
||||
"jupyter serverextension enable --py hide_code\n",
|
||||
"```\n",
|
||||
"2. Go to View -> Cell Toolbar -> Hide Code. \n",
|
||||
"3. Press ESC then T"
|
||||
"Start the UI with **Kernel -> Restart and Run All**."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"import ipywidgets as widgets\n",
|
||||
"import numpy as np\n",
|
||||
"import os\n",
|
||||
"import pprint\n",
|
||||
"import ray\n",
|
||||
"import subprocess\n",
|
||||
"import tempfile\n",
|
||||
"import time\n",
|
||||
"\n",
|
||||
"from IPython.display import display\n",
|
||||
"import ray.experimental.ui as ui\n",
|
||||
"\n",
|
||||
"ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Evaluate the box below to search for objects."
|
||||
"#### Object search."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"object_search = widgets.Text(\n",
|
||||
" value=\"\",\n",
|
||||
" placeholder=\"Object ID\",\n",
|
||||
" description=\"Search for an object:\",\n",
|
||||
" disabled=False\n",
|
||||
")\n",
|
||||
"display(object_search)\n",
|
||||
"\n",
|
||||
"def handle_submit(sender):\n",
|
||||
" pp = pprint.PrettyPrinter()\n",
|
||||
" pp.pprint(ray.global_state.object_table(object_search.value))\n",
|
||||
"\n",
|
||||
"object_search.on_submit(handle_submit)"
|
||||
"ui.object_search_bar()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Evaluate the box below to search for tasks."
|
||||
"#### Task search."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"task_search = widgets.Text(\n",
|
||||
" value=\"\",\n",
|
||||
" placeholder=\"Task ID\",\n",
|
||||
" description=\"Search for a task:\",\n",
|
||||
" disabled=False\n",
|
||||
")\n",
|
||||
"display(task_search)\n",
|
||||
"\n",
|
||||
"def handle_submit(sender):\n",
|
||||
" pp = pprint.PrettyPrinter()\n",
|
||||
" pp.pprint(ray.global_state.task_table(task_search.value))\n",
|
||||
"\n",
|
||||
"task_search.on_submit(handle_submit)"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"# Instances of this class maintains keep track of whether or not a\n",
|
||||
"# callback is currently executing. Since the execution of the callback\n",
|
||||
"# may trigger more calls to the callback, this is used to prevent infinite\n",
|
||||
"# recursions.\n",
|
||||
"class _EventRecursionContextManager(object):\n",
|
||||
" def __init__(self):\n",
|
||||
" self.should_recurse = True\n",
|
||||
" \n",
|
||||
" def __enter__(self):\n",
|
||||
" self.should_recurse = False\n",
|
||||
" \n",
|
||||
" def __exit__(self, *args):\n",
|
||||
" self.should_recurse = True\n",
|
||||
"\n",
|
||||
"total_time_value = \"% total time\"\n",
|
||||
"total_tasks_value = \"% total tasks\"\n",
|
||||
"\n",
|
||||
"# Function that returns instances of sliders and handles associated events.\n",
|
||||
"def get_sliders(update):\n",
|
||||
" # Start_box value indicates the desired start point of queried window.\n",
|
||||
" start_box = widgets.FloatText(\n",
|
||||
" description=\"Start Time:\",\n",
|
||||
" disabled=True,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # End_box value indicates the desired end point of queried window.\n",
|
||||
" end_box = widgets.FloatText(\n",
|
||||
" description=\"End Time:\",\n",
|
||||
" disabled=True,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Percentage slider. Indicates either % of total time or total tasks\n",
|
||||
" # depending on what breakdown_opt is set to.\n",
|
||||
" range_slider = widgets.IntRangeSlider(\n",
|
||||
" value=[70, 100],\n",
|
||||
" min=0,\n",
|
||||
" max=100,\n",
|
||||
" step=1,\n",
|
||||
" description=\"%:\",\n",
|
||||
" continuous_update=False,\n",
|
||||
" orientation=\"horizontal\",\n",
|
||||
" readout=True,\n",
|
||||
" readout_format=\".0i%\",\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Indicates the number of tasks that the user wants to be returned. Is\n",
|
||||
" # disabled when the breakdown_opt value is set to total_time_value. \n",
|
||||
" num_tasks_box = widgets.IntText(\n",
|
||||
" description=\"Num Tasks:\",\n",
|
||||
" disabled=False\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Dropdown bar that lets the user choose between modifying % of total\n",
|
||||
" # time or total number of tasks.\n",
|
||||
" breakdown_opt = widgets.Dropdown(\n",
|
||||
" options=[total_time_value, total_tasks_value],\n",
|
||||
" value=total_tasks_value,\n",
|
||||
" description=\"Selection Options:\"\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" # Initially passed in to the update_wrapper function.\n",
|
||||
" INIT_EVENT = \"INIT\"\n",
|
||||
" \n",
|
||||
" # Create instance of context manager to determine whether callback is\n",
|
||||
" # currently executing \n",
|
||||
" out_recursion = _EventRecursionContextManager()\n",
|
||||
" \n",
|
||||
" def update_wrapper(event):\n",
|
||||
" # Feature received a callback, but it shouldn't be executed\n",
|
||||
" # because the callback was the result of a different feature\n",
|
||||
" # executing its callback based on user input. \n",
|
||||
" if not out_recursion.should_recurse:\n",
|
||||
" return\n",
|
||||
" \n",
|
||||
" # Feature received a callback and it should be executed because\n",
|
||||
" # the callback was the result of user input. \n",
|
||||
" with out_recursion:\n",
|
||||
" smallest, largest, num_tasks = ray.global_state._job_length()\n",
|
||||
" diff = largest - smallest\n",
|
||||
" if num_tasks is not 0: \n",
|
||||
" \n",
|
||||
" # Describes the initial values that the slider/text box\n",
|
||||
" # values should be set to.\n",
|
||||
" if event == INIT_EVENT:\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" num_tasks_box.value = -min(10000, num_tasks)\n",
|
||||
" range_slider.value = (int(100 - (100. * -num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" else:\n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" start_box.value = round(diff * low, 2)\n",
|
||||
" end_box.value = round(diff * high, 2)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the start_box value. \n",
|
||||
" elif event[\"owner\"] == start_box:\n",
|
||||
" if start_box.value > end_box.value:\n",
|
||||
" start_box.value = end_box.value\n",
|
||||
" elif start_box.value < 0:\n",
|
||||
" start_box.value = 0\n",
|
||||
" low, high = range_slider.value\n",
|
||||
" range_slider.value = (int((start_box.value * 100.) / diff), high)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the end_box value. \n",
|
||||
" elif event[\"owner\"] == end_box:\n",
|
||||
" if start_box.value > end_box.value:\n",
|
||||
" end_box.value = start_box.value\n",
|
||||
" elif end_box.value > diff:\n",
|
||||
" end_box.value = diff\n",
|
||||
" low, high = range_slider.value\n",
|
||||
" range_slider.value = (low, int((end_box.value * 100.) / diff))\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the breakdown options\n",
|
||||
" # toggle.\n",
|
||||
" elif event[\"owner\"] == breakdown_opt:\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" start_box.disabled = True\n",
|
||||
" end_box.disabled = True\n",
|
||||
" num_tasks_box.disabled = False\n",
|
||||
" num_tasks_box.value = min(10000, num_tasks)\n",
|
||||
" range_slider.value = (int(100 - (100. * num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" else:\n",
|
||||
" start_box.disabled = False\n",
|
||||
" end_box.disabled = False\n",
|
||||
" num_tasks_box.disabled = True\n",
|
||||
" range_slider.value = (int((start_box.value * 100.) / diff),\n",
|
||||
" int((end_box.value * 100.) / diff))\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the range_slider\n",
|
||||
" # value.\n",
|
||||
" elif event[\"owner\"] == range_slider:\n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" if breakdown_opt.value == total_tasks_value:\n",
|
||||
" old_low, old_high = event[\"old\"]\n",
|
||||
" new_low, new_high = event[\"new\"]\n",
|
||||
" if old_low != new_low:\n",
|
||||
" range_slider.value = (new_low, 100)\n",
|
||||
" num_tasks_box.value = -(100. - new_low) / 100. * num_tasks\n",
|
||||
" else:\n",
|
||||
" range_slider.value = (0, new_high)\n",
|
||||
" num_tasks_box.value = new_high / 100. * num_tasks\n",
|
||||
" else:\n",
|
||||
" start_box.value = round(diff * low, 2)\n",
|
||||
" end_box.value = round(diff * high, 2)\n",
|
||||
" \n",
|
||||
" # Event was triggered by a change in the num_tasks_box\n",
|
||||
" # value.\n",
|
||||
" elif event[\"owner\"] == num_tasks_box:\n",
|
||||
" if num_tasks_box.value > 0:\n",
|
||||
" range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))\n",
|
||||
" elif num_tasks_box.value < 0:\n",
|
||||
" range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)\n",
|
||||
" \n",
|
||||
" if not update:\n",
|
||||
" return\n",
|
||||
"\n",
|
||||
" diff = largest - smallest\n",
|
||||
" \n",
|
||||
" # Low and high are used to scale the times that are\n",
|
||||
" # queried to be relative to the absolute time. \n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" \n",
|
||||
" # Queries to task_profiles based on the slider and text\n",
|
||||
" # box values.\n",
|
||||
" # (Querying based on the % total amount of time.) \n",
|
||||
" if breakdown_opt.value == total_time_value:\n",
|
||||
" tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n",
|
||||
" \n",
|
||||
" # (Querying based on % of total number of tasks that were\n",
|
||||
" # run.)\n",
|
||||
" elif breakdown_opt.value == total_tasks_value:\n",
|
||||
" if range_slider.value[0] == 0:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n",
|
||||
" else:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
|
||||
"\n",
|
||||
" update(smallest, largest, num_tasks, tasks)\n",
|
||||
" \n",
|
||||
" # Get updated values from a slider or text box, and update the rest of\n",
|
||||
" # them accordingly.\n",
|
||||
" range_slider.observe(update_wrapper, names=\"value\")\n",
|
||||
" breakdown_opt.observe(update_wrapper, names=\"value\")\n",
|
||||
" start_box.observe(update_wrapper, names=\"value\")\n",
|
||||
" end_box.observe(update_wrapper, names=\"value\")\n",
|
||||
" num_tasks_box.observe(update_wrapper, names=\"value\")\n",
|
||||
" \n",
|
||||
" # Initializes the sliders\n",
|
||||
" update_wrapper(INIT_EVENT)\n",
|
||||
" \n",
|
||||
" # Display sliders and search boxes \n",
|
||||
" display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)\n",
|
||||
" \n",
|
||||
" # Return the sliders and text boxes \n",
|
||||
" return start_box, end_box, range_slider, breakdown_opt"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"def task_timeline():\n",
|
||||
" path_input = widgets.Button(description=\"View task timeline\")\n",
|
||||
"\n",
|
||||
" breakdown_basic = \"Basic\"\n",
|
||||
" breakdown_task = \"Task Breakdowns\"\n",
|
||||
" \n",
|
||||
" breakdown_opt = widgets.Dropdown(\n",
|
||||
" options=[\"Basic\", \"Task Breakdowns\"],\n",
|
||||
" value=\"Basic\",\n",
|
||||
" description=\"View options:\",\n",
|
||||
" disabled=False,\n",
|
||||
" )\n",
|
||||
" \n",
|
||||
" start_box, end_box, range_slider, time_opt = get_sliders(False)\n",
|
||||
" display(breakdown_opt)\n",
|
||||
" display(path_input)\n",
|
||||
"\n",
|
||||
" def find_trace2html():\n",
|
||||
" trace2html = \"/tmp/ray/catapult/tracing/bin/trace2html\"\n",
|
||||
" # Clone the catapult repository if it doesn't exist. TODO(rkn): We\n",
|
||||
" # could do this in the build.sh script later on.\n",
|
||||
" if not os.path.exists(trace2html): \n",
|
||||
" cmd = [\"git\", \"clone\", \"https://github.com/catapult-project/catapult.git\", \"/tmp/ray/catapult\"]\n",
|
||||
" subprocess.check_output(cmd) \n",
|
||||
" print(\"Cloning catapult to /tmp/ray/catapult.\")\n",
|
||||
" assert os.path.exists(trace2html)\n",
|
||||
" return trace2html\n",
|
||||
"\n",
|
||||
" def handle_submit(sender):\n",
|
||||
" tmp = tempfile.mktemp() + \".json\"\n",
|
||||
" tmp2 = tempfile.mktemp() + \".html\"\n",
|
||||
" \n",
|
||||
" if breakdown_opt.value == breakdown_basic:\n",
|
||||
" breakdown = False\n",
|
||||
" elif breakdown_opt.value == breakdown_task:\n",
|
||||
" breakdown = True\n",
|
||||
" else:\n",
|
||||
" raise ValueError(\"Unexpected breakdown value '{}'\".format(breakdown_opt.value))\n",
|
||||
" \n",
|
||||
" low, high = map(lambda x: x / 100., range_slider.value)\n",
|
||||
" \n",
|
||||
" smallest, largest, num_tasks = ray.global_state._job_length()\n",
|
||||
" diff = largest - smallest\n",
|
||||
"\n",
|
||||
" if time_opt.value == total_time_value:\n",
|
||||
" tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n",
|
||||
" elif time_opt.value == total_tasks_value:\n",
|
||||
" if range_slider.value[0] == 0:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n",
|
||||
" else:\n",
|
||||
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
|
||||
" else:\n",
|
||||
" raise ValueError(\"Unexpected time value '{}'\".format(time_opt.value))\n",
|
||||
" \n",
|
||||
" print(\"{} tasks to trace\".format(len(tasks)))\n",
|
||||
" print(\"Dumping task profiling data to \" + tmp)\n",
|
||||
" ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)\n",
|
||||
" print(\"Converting chrome trace to \" + tmp2)\n",
|
||||
" trace2html = find_trace2html()\n",
|
||||
" # TODO(rkn): The trace2html script currently requires Python 2.\n",
|
||||
" # Remove this dependency.\n",
|
||||
" subprocess.check_output([\"python2\", trace2html, tmp, \"--output\", tmp2])\n",
|
||||
" # Open the timeline in Chrome. TODO(rkn): We should remove the\n",
|
||||
" # dependency on Chrome and use whatever browser is currently being\n",
|
||||
" # used. Note that this currently does not work when Ray is being\n",
|
||||
" # used on a cluster and the browser is running locally.\n",
|
||||
" print(\"Opening html file in browser...\")\n",
|
||||
" subprocess.Popen([\"open\", \"-a\", \"Google Chrome\", tmp2])\n",
|
||||
"\n",
|
||||
" path_input.on_click(handle_submit)\n",
|
||||
"\n",
|
||||
"task_timeline()"
|
||||
"ui.task_search_bar()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Task duration distribution plot."
|
||||
"#### Task trace timeline."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from bokeh.models import Range1d, ColumnDataSource\n",
|
||||
"from bokeh.layouts import gridplot\n",
|
||||
"from bokeh.plotting import figure, show, helpers\n",
|
||||
"from bokeh.io import output_notebook, push_notebook\n",
|
||||
"from bokeh.resources import CDN\n",
|
||||
"output_notebook(resources=CDN)\n",
|
||||
"\n",
|
||||
"\n",
|
||||
"def task_completion_time_distribution():\n",
|
||||
" # Create the Bokeh plot \n",
|
||||
" p = figure(title=\"Task Completion Time Distribution\",tools=[\"save\", \"hover\", \"wheel_zoom\", \"box_zoom\", \"pan\"],\n",
|
||||
" background_fill_color=\"#FFFFFF\", x_range=(0, 1), y_range = (0, 1))\n",
|
||||
" \n",
|
||||
" # Create the data source that the plot pulls from \n",
|
||||
" source = ColumnDataSource(data={\n",
|
||||
" \"top\": [],\n",
|
||||
" \"left\": [],\n",
|
||||
" \"right\": []\n",
|
||||
" })\n",
|
||||
" \n",
|
||||
" # Plot the histogram rectangles\n",
|
||||
" p.quad(top=\"top\", bottom=0, left=\"left\", right=\"right\", source=source,\n",
|
||||
" fill_color=\"#B3B3B3\", line_color=\"#033649\")\n",
|
||||
"\n",
|
||||
" # Label the plot axes \n",
|
||||
" p.xaxis.axis_label = \"Duration in seconds\"\n",
|
||||
" p.yaxis.axis_label = \"Number of tasks\"\n",
|
||||
"\n",
|
||||
" handle = show(gridplot(p, ncols=1, plot_width=500, plot_height=500, toolbar_location=\"below\"),\n",
|
||||
" notebook_handle=True)\n",
|
||||
"\n",
|
||||
" # Function to update the plot \n",
|
||||
" def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n",
|
||||
" if len(tasks) == 0:\n",
|
||||
" return\n",
|
||||
" \n",
|
||||
" # Create the distribution to plot\n",
|
||||
" distr = []\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" distr.append(data[\"store_outputs_end\"] - data[\"get_arguments_start\"])\n",
|
||||
" \n",
|
||||
" # Create a histogram from the distribution \n",
|
||||
" top, bin_edges = np.histogram(distr, bins=\"auto\")\n",
|
||||
" left = bin_edges[:-1]\n",
|
||||
" right = bin_edges[1:]\n",
|
||||
"\n",
|
||||
" source.data = {\"top\": top, \"left\": left, \"right\": right}\n",
|
||||
" \n",
|
||||
" # Set the x and y ranges \n",
|
||||
" x_range = (min(left) if len(left) else 0, max(right) if len(right) else 1)\n",
|
||||
" y_range = (0, max(top) + 1 if len(top) else 1)\n",
|
||||
" \n",
|
||||
" x_range = helpers._get_range(x_range)\n",
|
||||
" p.x_range.start = x_range.start\n",
|
||||
" p.x_range.end = x_range.end\n",
|
||||
"\n",
|
||||
" y_range = helpers._get_range(y_range)\n",
|
||||
" p.y_range.start = y_range.start\n",
|
||||
" p.y_range.end = y_range.end\n",
|
||||
" \n",
|
||||
" # Push updates to the plot\n",
|
||||
" push_notebook(handle=handle)\n",
|
||||
" \n",
|
||||
" get_sliders(task_completion_time_update)\n",
|
||||
"\n",
|
||||
"task_completion_time_distribution()"
|
||||
"ui.task_timeline()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### CPU usage over time plot."
|
||||
"#### Task durations."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from bokeh.layouts import gridplot\n",
|
||||
"from bokeh.plotting import figure, show, helpers\n",
|
||||
"from bokeh.resources import CDN\n",
|
||||
"from bokeh.io import output_notebook, push_notebook\n",
|
||||
"from bokeh.models import Range1d, ColumnDataSource\n",
|
||||
"import numpy as np\n",
|
||||
"output_notebook(resources=CDN)\n",
|
||||
" \n",
|
||||
"# Parse the client table to determine how many CPUs are available\n",
|
||||
"num_cpus = 0 \n",
|
||||
"client_table = ray.global_state.client_table()\n",
|
||||
"for node_ip, client_list in client_table.items(): \n",
|
||||
" for client in client_list: \n",
|
||||
" if \"NumCPUs\" in client: \n",
|
||||
" num_cpus += client[\"NumCPUs\"]\n",
|
||||
"\n",
|
||||
"def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets, use_abs_times=False):\n",
|
||||
" \n",
|
||||
" if len(tasks) == 0:\n",
|
||||
" return [], [], []\n",
|
||||
" \n",
|
||||
" if use_abs_times:\n",
|
||||
" earliest_time = abs_earliest\n",
|
||||
" latest_time = abs_latest\n",
|
||||
" else:\n",
|
||||
" # Determine what the earliest and latest tasks are out of the ones that are passed in\n",
|
||||
" earliest_time = time.time()\n",
|
||||
" latest_time = 0\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" latest_time = max((latest_time, data[\"store_outputs_end\"]))\n",
|
||||
" earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n",
|
||||
" \n",
|
||||
" # Add some epsilon to latest_time to ensure that the end time of the last task\n",
|
||||
" # falls __within__ a bucket, and not on the edge\n",
|
||||
" latest_time += 1e-6\n",
|
||||
" \n",
|
||||
" # Compute average CPU utilization per time bucket by summing cpu-time per bucket\n",
|
||||
" bucket_time_length = (latest_time - earliest_time) / float(num_buckets)\n",
|
||||
" cpu_time = [0 for _ in range(num_buckets)]\n",
|
||||
" \n",
|
||||
" for data in tasks.values():\n",
|
||||
" task_start_time = data[\"get_arguments_start\"]\n",
|
||||
" task_end_time = data[\"store_outputs_end\"]\n",
|
||||
" \n",
|
||||
" start_bucket = int((task_start_time - earliest_time) / bucket_time_length)\n",
|
||||
" end_bucket = int((task_end_time - earliest_time) / bucket_time_length)\n",
|
||||
" # Walk over each time bucket that this task intersects, adding the amount of\n",
|
||||
" # time that the task intersects within each bucket\n",
|
||||
" for bucket_idx in range(start_bucket, end_bucket + 1):\n",
|
||||
" bucket_start_time = earliest_time + bucket_idx * bucket_time_length\n",
|
||||
" bucket_end_time = earliest_time + (bucket_idx + 1) * bucket_time_length\n",
|
||||
" \n",
|
||||
" task_start_time_within_bucket = max(task_start_time, bucket_start_time)\n",
|
||||
" task_end_time_within_bucket = min(task_end_time, bucket_end_time)\n",
|
||||
" task_cpu_time_within_bucket = task_end_time_within_bucket - task_start_time_within_bucket\n",
|
||||
" \n",
|
||||
" if bucket_idx > -1 and bucket_idx < num_buckets: \n",
|
||||
" cpu_time[bucket_idx] += task_cpu_time_within_bucket\n",
|
||||
" \n",
|
||||
" # Cpu_utilization is the average cpu utilization of the bucket, which is just\n",
|
||||
" # cpu_time divided by bucket_time_length\n",
|
||||
" cpu_utilization = list(map(lambda x: x / float(bucket_time_length), cpu_time))\n",
|
||||
" \n",
|
||||
" # Generate histogram bucket edges. Subtract out abs_earliest to get relative time\n",
|
||||
" all_edges = [earliest_time - abs_earliest + i * bucket_time_length for i in range(num_buckets + 1)]\n",
|
||||
" # Left edges are all but the rightmost edge, right edges are all but the leftmost edge\n",
|
||||
" left_edges = all_edges[:-1]\n",
|
||||
" right_edges = all_edges[1:]\n",
|
||||
" \n",
|
||||
" return left_edges, right_edges, cpu_utilization\n",
|
||||
" \n",
|
||||
"\n",
|
||||
"# Update the plot based on the sliders\n",
|
||||
"def plot_utilization():\n",
|
||||
" # Create the Bokeh plot\n",
|
||||
" time_series_fig = figure(title=\"CPU Utilization\",\n",
|
||||
" tools=[\"save\", \"hover\", \"wheel_zoom\", \"box_zoom\", \"pan\"],\n",
|
||||
" background_fill_color=\"#FFFFFF\", x_range=[0, 1], y_range=[0, 1])\n",
|
||||
" \n",
|
||||
" # Create the data source that the plot will pull from\n",
|
||||
" time_series_source = ColumnDataSource(data=dict(\n",
|
||||
" left=[],\n",
|
||||
" right=[],\n",
|
||||
" top=[]\n",
|
||||
" ))\n",
|
||||
" \n",
|
||||
" # Plot the rectangles representing the distribution\n",
|
||||
" time_series_fig.quad(left=\"left\", right=\"right\", top=\"top\", bottom=0,\n",
|
||||
" source=time_series_source, fill_color=\"#B3B3B3\", line_color=\"#033649\")\n",
|
||||
" \n",
|
||||
" # Label the plot axes\n",
|
||||
" time_series_fig.xaxis.axis_label = \"Time in seconds\"\n",
|
||||
" time_series_fig.yaxis.axis_label = \"Number of CPUs used\"\n",
|
||||
" \n",
|
||||
" handle = show(gridplot(time_series_fig, ncols=1, plot_width=500, plot_height=500, toolbar_location=\"below\"),\n",
|
||||
" notebook_handle=True)\n",
|
||||
" \n",
|
||||
" def update_plot(abs_earliest, abs_latest, abs_num_tasks, tasks):\n",
|
||||
" num_buckets = 100\n",
|
||||
" left, right, top = compute_utilizations(abs_earliest, abs_latest, abs_num_tasks, tasks, num_buckets)\n",
|
||||
" \n",
|
||||
" time_series_source.data = {\"left\": left, \"right\": right, \"top\": top}\n",
|
||||
" \n",
|
||||
" x_range = (max(0, min(left)) if len(left) else 0, max(right) if len(right) else 1)\n",
|
||||
" y_range = (0, max(top) + 1 if len(top) else 1)\n",
|
||||
" \n",
|
||||
" # Define the axis ranges\n",
|
||||
" x_range = helpers._get_range(x_range)\n",
|
||||
" time_series_fig.x_range.start = x_range.start\n",
|
||||
" time_series_fig.x_range.end = x_range.end\n",
|
||||
" \n",
|
||||
" y_range = helpers._get_range(y_range)\n",
|
||||
" time_series_fig.y_range.start = y_range.start\n",
|
||||
" time_series_fig.y_range.end = num_cpus\n",
|
||||
" \n",
|
||||
" # Push the updated data to the notebook\n",
|
||||
" push_notebook(handle=handle)\n",
|
||||
" \n",
|
||||
" get_sliders(update_plot)\n",
|
||||
"\n",
|
||||
"plot_utilization()"
|
||||
"ui.task_completion_time_distribution()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Cluster usage over time \"heat map\"."
|
||||
"#### CPU usage."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {
|
||||
"collapsed": true
|
||||
},
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"from bokeh.io import show, output_notebook, push_notebook\n",
|
||||
"from bokeh.resources import CDN\n",
|
||||
"from bokeh.plotting import figure, helpers\n",
|
||||
"from bokeh.models import (\n",
|
||||
" ColumnDataSource,\n",
|
||||
" HoverTool,\n",
|
||||
" LinearColorMapper,\n",
|
||||
" BasicTicker,\n",
|
||||
" PrintfTickFormatter,\n",
|
||||
" ColorBar,\n",
|
||||
")\n",
|
||||
"output_notebook(resources=CDN)\n",
|
||||
"\n",
|
||||
"# Function to create the cluster usage \"heat map\" \n",
|
||||
"def cluster_usage():\n",
|
||||
" \n",
|
||||
" # Initial values \n",
|
||||
" source = ColumnDataSource(data={\"node_ip_address\":['127.0.0.1'], \"time\":['0.5'], \"num_tasks\":['1'], \"length\": [1]})\n",
|
||||
"\n",
|
||||
" # Define the color schema \n",
|
||||
" colors = [\"#75968f\", \"#a5bab7\", \"#c9d9d3\", \"#e2e2e2\", \"#dfccce\", \"#ddb7b1\", \"#cc7878\", \"#933b41\", \"#550b1d\"]\n",
|
||||
" mapper = LinearColorMapper(palette=colors, low=0, high=2)\n",
|
||||
"\n",
|
||||
" TOOLS = \"hover, save, xpan, box_zoom, reset, xwheel_zoom\"\n",
|
||||
" \n",
|
||||
" # Create the plot \n",
|
||||
" p = figure(title=\"Cluster Usage\", y_range=list(set(source.data['node_ip_address'])),\n",
|
||||
" x_axis_location=\"above\", plot_width=900, plot_height=500,\n",
|
||||
" tools=TOOLS, toolbar_location='below')\n",
|
||||
"\n",
|
||||
" # Format the plot axes \n",
|
||||
" p.grid.grid_line_color = None\n",
|
||||
" p.axis.axis_line_color = None\n",
|
||||
" p.axis.major_tick_line_color = None\n",
|
||||
" p.axis.major_label_text_font_size = \"10pt\"\n",
|
||||
" p.axis.major_label_standoff = 0\n",
|
||||
" p.xaxis.major_label_orientation = np.pi / 3\n",
|
||||
"\n",
|
||||
" # Plot rectangles\n",
|
||||
" p.rect(x=\"time\", y=\"node_ip_address\", width=\"length\", height=1,\n",
|
||||
" source=source,\n",
|
||||
" fill_color={\"field\": \"num_tasks\", \"transform\": mapper},\n",
|
||||
" line_color=None)\n",
|
||||
"\n",
|
||||
" # Add legend to the side of the plot\n",
|
||||
" color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size=\"8pt\",\n",
|
||||
" ticker=BasicTicker(desired_num_ticks=len(colors)),\n",
|
||||
" label_standoff=6, border_line_color=None, location=(0, 0))\n",
|
||||
" p.add_layout(color_bar, \"right\")\n",
|
||||
"\n",
|
||||
" # Define hover tool\n",
|
||||
" p.select_one(HoverTool).tooltips = [\n",
|
||||
" (\"Node IP Address\", \"@node_ip_address\"),\n",
|
||||
" (\"Number of tasks running\", \"@num_tasks\"),\n",
|
||||
" (\"Time\", \"@time\")\n",
|
||||
" ]\n",
|
||||
"\n",
|
||||
" # Define the axis labels \n",
|
||||
" p.xaxis.axis_label = \"Time in seconds\"\n",
|
||||
" p.yaxis.axis_label = \"Node IP Address\"\n",
|
||||
" handle = show(p, notebook_handle=True)\n",
|
||||
" workers = ray.global_state.workers()\n",
|
||||
" # Function to update the heat map \n",
|
||||
" def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n",
|
||||
" if len(tasks) == 0:\n",
|
||||
" return\n",
|
||||
" \n",
|
||||
" granularity = 1\n",
|
||||
" earliest = time.time()\n",
|
||||
" latest = 0\n",
|
||||
" \n",
|
||||
" node_to_tasks = dict()\n",
|
||||
" # Determine which task has the earlest start time out of the ones passed into the \n",
|
||||
" # update function\n",
|
||||
" for task_id, data in tasks.items():\n",
|
||||
" if data[\"score\"] > latest:\n",
|
||||
" latest = data[\"score\"]\n",
|
||||
" if data[\"score\"] < earliest:\n",
|
||||
" earliest = data[\"score\"]\n",
|
||||
" worker_id = data[\"worker_id\"]\n",
|
||||
" node_ip = workers[worker_id][\"node_ip_address\"]\n",
|
||||
" if node_ip not in node_to_tasks: \n",
|
||||
" node_to_tasks[node_ip] = {}\n",
|
||||
" node_to_tasks[node_ip][task_id] = data\n",
|
||||
" \n",
|
||||
" nodes = []\n",
|
||||
" times = []\n",
|
||||
" lengths = []\n",
|
||||
" num_tasks = []\n",
|
||||
" \n",
|
||||
" for node_ip, task_dict in node_to_tasks.items():\n",
|
||||
" left, right, top = compute_utilizations(earliest, latest, abs_num_tasks, task_dict, 100, True)\n",
|
||||
" for (l, r, t) in zip(left, right, top):\n",
|
||||
" nodes.append(node_ip)\n",
|
||||
" times.append((l + r) / 2)\n",
|
||||
" lengths.append(r - l)\n",
|
||||
" num_tasks.append(t)\n",
|
||||
"\n",
|
||||
" # Set the y range of the plot to be the node IP addresses \n",
|
||||
" p.y_range.factors = list(set(nodes))\n",
|
||||
" \n",
|
||||
" mapper.low = min(min(num_tasks), 0)\n",
|
||||
" mapper.high = max(max(num_tasks), 1)\n",
|
||||
"\n",
|
||||
" # Update plot with new data based on slider and text box values \n",
|
||||
" source.data = {\"node_ip_address\": nodes, \"time\": times, \"num_tasks\": num_tasks, \"length\": lengths}\n",
|
||||
" \n",
|
||||
" push_notebook(handle=handle)\n",
|
||||
"\n",
|
||||
" get_sliders(heat_map_update)\n",
|
||||
" \n",
|
||||
"cluster_usage()"
|
||||
"ui.cpu_usage()"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"#### Cluster usage."
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": null,
|
||||
"metadata": {},
|
||||
"outputs": [],
|
||||
"source": [
|
||||
"ui.cluster_usage()"
|
||||
]
|
||||
}
|
||||
],
|
||||
|
||||
@@ -0,0 +1,722 @@
|
||||
import ipywidgets as widgets
|
||||
import numpy as np
|
||||
import os
|
||||
import pprint
|
||||
import ray
|
||||
import subprocess
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from IPython.display import display
|
||||
|
||||
# Instances of this class maintains keep track of whether or not a
|
||||
# callback is currently executing. Since the execution of the callback
|
||||
# may trigger more calls to the callback, this is used to prevent infinite
|
||||
# recursions.
|
||||
|
||||
|
||||
class _EventRecursionContextManager(object):
|
||||
def __init__(self):
|
||||
self.should_recurse = True
|
||||
|
||||
def __enter__(self):
|
||||
self.should_recurse = False
|
||||
|
||||
def __exit__(self, *args):
|
||||
self.should_recurse = True
|
||||
|
||||
|
||||
total_time_value = "% total time"
|
||||
total_tasks_value = "% total tasks"
|
||||
|
||||
|
||||
# Function that returns instances of sliders and handles associated events.
|
||||
|
||||
def get_sliders(update):
|
||||
# Start_box value indicates the desired start point of queried window.
|
||||
start_box = widgets.FloatText(
|
||||
description="Start Time:",
|
||||
disabled=True,
|
||||
)
|
||||
|
||||
# End_box value indicates the desired end point of queried window.
|
||||
end_box = widgets.FloatText(
|
||||
description="End Time:",
|
||||
disabled=True,
|
||||
)
|
||||
|
||||
# Percentage slider. Indicates either % of total time or total tasks
|
||||
# depending on what breakdown_opt is set to.
|
||||
range_slider = widgets.IntRangeSlider(
|
||||
value=[70, 100],
|
||||
min=0,
|
||||
max=100,
|
||||
step=1,
|
||||
description="%:",
|
||||
continuous_update=False,
|
||||
orientation="horizontal",
|
||||
readout=True,
|
||||
readout_format=".0i%",
|
||||
)
|
||||
|
||||
# Indicates the number of tasks that the user wants to be returned. Is
|
||||
# disabled when the breakdown_opt value is set to total_time_value.
|
||||
num_tasks_box = widgets.IntText(
|
||||
description="Num Tasks:",
|
||||
disabled=False
|
||||
)
|
||||
|
||||
# Dropdown bar that lets the user choose between modifying % of total
|
||||
# time or total number of tasks.
|
||||
breakdown_opt = widgets.Dropdown(
|
||||
options=[total_time_value, total_tasks_value],
|
||||
value=total_tasks_value,
|
||||
description="Selection Options:"
|
||||
)
|
||||
|
||||
# Initially passed in to the update_wrapper function.
|
||||
INIT_EVENT = "INIT"
|
||||
|
||||
# Create instance of context manager to determine whether callback is
|
||||
# currently executing
|
||||
out_recursion = _EventRecursionContextManager()
|
||||
|
||||
def update_wrapper(event):
|
||||
# Feature received a callback, but it shouldn't be executed
|
||||
# because the callback was the result of a different feature
|
||||
# executing its callback based on user input.
|
||||
if not out_recursion.should_recurse:
|
||||
return
|
||||
|
||||
# Feature received a callback and it should be executed because
|
||||
# the callback was the result of user input.
|
||||
with out_recursion:
|
||||
smallest, largest, num_tasks = ray.global_state._job_length()
|
||||
diff = largest - smallest
|
||||
if num_tasks is not 0:
|
||||
|
||||
# Describes the initial values that the slider/text box
|
||||
# values should be set to.
|
||||
if event == INIT_EVENT:
|
||||
if breakdown_opt.value == total_tasks_value:
|
||||
num_tasks_box.value = -min(10000, num_tasks)
|
||||
range_slider.value = (int(100 -
|
||||
(100. * -num_tasks_box.value)
|
||||
/ num_tasks), 100)
|
||||
else:
|
||||
low, high = map(lambda x: x / 100., range_slider.value)
|
||||
start_box.value = round(diff * low, 2)
|
||||
end_box.value = round(diff * high, 2)
|
||||
|
||||
# Event was triggered by a change in the start_box value.
|
||||
elif event["owner"] == start_box:
|
||||
if start_box.value > end_box.value:
|
||||
start_box.value = end_box.value
|
||||
elif start_box.value < 0:
|
||||
start_box.value = 0
|
||||
low, high = range_slider.value
|
||||
range_slider.value = (int((start_box.value * 100.)
|
||||
/ diff), high)
|
||||
|
||||
# Event was triggered by a change in the end_box value.
|
||||
elif event["owner"] == end_box:
|
||||
if start_box.value > end_box.value:
|
||||
end_box.value = start_box.value
|
||||
elif end_box.value > diff:
|
||||
end_box.value = diff
|
||||
low, high = range_slider.value
|
||||
range_slider.value = (low, int((end_box.value * 100.)
|
||||
/ diff))
|
||||
|
||||
# Event was triggered by a change in the breakdown options
|
||||
# toggle.
|
||||
elif event["owner"] == breakdown_opt:
|
||||
if breakdown_opt.value == total_tasks_value:
|
||||
start_box.disabled = True
|
||||
end_box.disabled = True
|
||||
num_tasks_box.disabled = False
|
||||
num_tasks_box.value = min(10000, num_tasks)
|
||||
range_slider.value = (int(100 -
|
||||
(100. * num_tasks_box.value)
|
||||
/ num_tasks), 100)
|
||||
else:
|
||||
start_box.disabled = False
|
||||
end_box.disabled = False
|
||||
num_tasks_box.disabled = True
|
||||
range_slider.value = (int((start_box.value * 100.)
|
||||
/ diff),
|
||||
int((end_box.value * 100.)
|
||||
/ diff))
|
||||
|
||||
# Event was triggered by a change in the range_slider
|
||||
# value.
|
||||
elif event["owner"] == range_slider:
|
||||
low, high = map(lambda x: x / 100., range_slider.value)
|
||||
if breakdown_opt.value == total_tasks_value:
|
||||
old_low, old_high = event["old"]
|
||||
new_low, new_high = event["new"]
|
||||
if old_low != new_low:
|
||||
range_slider.value = (new_low, 100)
|
||||
num_tasks_box.value = (-(100. - new_low)
|
||||
/ 100. * num_tasks)
|
||||
else:
|
||||
range_slider.value = (0, new_high)
|
||||
num_tasks_box.value = new_high / 100. * num_tasks
|
||||
else:
|
||||
start_box.value = round(diff * low, 2)
|
||||
end_box.value = round(diff * high, 2)
|
||||
|
||||
# Event was triggered by a change in the num_tasks_box
|
||||
# value.
|
||||
elif event["owner"] == num_tasks_box:
|
||||
if num_tasks_box.value > 0:
|
||||
range_slider.value = (0, int(100 *
|
||||
float(num_tasks_box.value)
|
||||
/ num_tasks))
|
||||
elif num_tasks_box.value < 0:
|
||||
range_slider.value = (100 +
|
||||
int(100 *
|
||||
float(num_tasks_box.value)
|
||||
/ num_tasks), 100)
|
||||
|
||||
if not update:
|
||||
return
|
||||
|
||||
diff = largest - smallest
|
||||
|
||||
# Low and high are used to scale the times that are
|
||||
# queried to be relative to the absolute time.
|
||||
low, high = map(lambda x: x / 100., range_slider.value)
|
||||
|
||||
# Queries to task_profiles based on the slider and text
|
||||
# box values.
|
||||
# (Querying based on the % total amount of time.)
|
||||
if breakdown_opt.value == total_time_value:
|
||||
tasks = ray.global_state.task_profiles(start=(smallest +
|
||||
diff * low),
|
||||
end=(smallest
|
||||
+ diff * high))
|
||||
|
||||
# (Querying based on % of total number of tasks that were
|
||||
# run.)
|
||||
elif breakdown_opt.value == total_tasks_value:
|
||||
if range_slider.value[0] == 0:
|
||||
tasks = ray.global_state.task_profiles(num_tasks=(int(
|
||||
num_tasks
|
||||
* high)),
|
||||
fwd=True)
|
||||
else:
|
||||
tasks = ray.global_state.task_profiles(num_tasks=(int(
|
||||
num_tasks *
|
||||
(high - low))),
|
||||
fwd=False)
|
||||
|
||||
update(smallest, largest, num_tasks, tasks)
|
||||
|
||||
# Get updated values from a slider or text box, and update the rest of
|
||||
# them accordingly.
|
||||
range_slider.observe(update_wrapper, names="value")
|
||||
breakdown_opt.observe(update_wrapper, names="value")
|
||||
start_box.observe(update_wrapper, names="value")
|
||||
end_box.observe(update_wrapper, names="value")
|
||||
num_tasks_box.observe(update_wrapper, names="value")
|
||||
|
||||
# Initializes the sliders
|
||||
update_wrapper(INIT_EVENT)
|
||||
|
||||
# Display sliders and search boxes
|
||||
display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)
|
||||
|
||||
# Return the sliders and text boxes
|
||||
return start_box, end_box, range_slider, breakdown_opt
|
||||
|
||||
|
||||
def object_search_bar():
|
||||
object_search = widgets.Text(
|
||||
value="",
|
||||
placeholder="Object ID",
|
||||
description="Search for an object:",
|
||||
disabled=False
|
||||
)
|
||||
display(object_search)
|
||||
|
||||
def handle_submit(sender):
|
||||
pp = pprint.PrettyPrinter()
|
||||
pp.pprint(ray.global_state.object_table(object_search.value))
|
||||
|
||||
object_search.on_submit(handle_submit)
|
||||
|
||||
|
||||
def task_search_bar():
|
||||
task_search = widgets.Text(
|
||||
value="",
|
||||
placeholder="Task ID",
|
||||
description="Search for a task:",
|
||||
disabled=False
|
||||
)
|
||||
display(task_search)
|
||||
|
||||
def handle_submit(sender):
|
||||
pp = pprint.PrettyPrinter()
|
||||
pp.pprint(ray.global_state.task_table(task_search.value))
|
||||
|
||||
task_search.on_submit(handle_submit)
|
||||
|
||||
|
||||
def task_timeline():
|
||||
path_input = widgets.Button(description="View task timeline")
|
||||
|
||||
breakdown_basic = "Basic"
|
||||
breakdown_task = "Task Breakdowns"
|
||||
|
||||
breakdown_opt = widgets.Dropdown(
|
||||
options=["Basic", "Task Breakdowns"],
|
||||
value="Basic",
|
||||
description="View options:",
|
||||
disabled=False,
|
||||
)
|
||||
|
||||
start_box, end_box, range_slider, time_opt = get_sliders(False)
|
||||
# display(breakdown_opt)
|
||||
# display(path_input)
|
||||
|
||||
def find_trace2html():
|
||||
trace2html = "/tmp/ray/catapult/tracing/bin/trace2html"
|
||||
# Clone the catapult repository if it doesn't exist. TODO(rkn): We
|
||||
# could do this in the build.sh script later on.
|
||||
if not os.path.exists(trace2html):
|
||||
cmd = ["git",
|
||||
"clone",
|
||||
"https://github.com/catapult-project/catapult.git",
|
||||
"/tmp/ray/catapult"]
|
||||
subprocess.check_output(cmd)
|
||||
print("Cloning catapult to /tmp/ray/catapult.")
|
||||
assert os.path.exists(trace2html)
|
||||
return trace2html
|
||||
|
||||
def handle_submit(sender):
|
||||
tmp = tempfile.mktemp() + ".json"
|
||||
tmp2 = tempfile.mktemp() + ".html"
|
||||
|
||||
if breakdown_opt.value == breakdown_basic:
|
||||
breakdown = False
|
||||
elif breakdown_opt.value == breakdown_task:
|
||||
breakdown = True
|
||||
else:
|
||||
raise ValueError(
|
||||
"Unexpected breakdown value '{}'".format(breakdown_opt.value))
|
||||
|
||||
low, high = map(lambda x: x / 100., range_slider.value)
|
||||
|
||||
smallest, largest, num_tasks = ray.global_state._job_length()
|
||||
diff = largest - smallest
|
||||
|
||||
if time_opt.value == total_time_value:
|
||||
tasks = ray.global_state.task_profiles(
|
||||
start=smallest + diff * low,
|
||||
end=smallest + diff * high)
|
||||
elif time_opt.value == total_tasks_value:
|
||||
if range_slider.value[0] == 0:
|
||||
tasks = ray.global_state.task_profiles(
|
||||
num_tasks=int(num_tasks * high),
|
||||
fwd=True)
|
||||
else:
|
||||
tasks = ray.global_state.task_profiles(
|
||||
num_tasks=int(num_tasks * (high - low)),
|
||||
fwd=False)
|
||||
else:
|
||||
raise ValueError(
|
||||
"Unexpected time value '{}'".format(time_opt.value))
|
||||
|
||||
print("{} tasks to trace".format(len(tasks)))
|
||||
print("Dumping task profiling data to " + tmp)
|
||||
ray.global_state.dump_catapult_trace(tmp,
|
||||
tasks,
|
||||
breakdowns=breakdown)
|
||||
print("Converting chrome trace to " + tmp2)
|
||||
trace2html = find_trace2html()
|
||||
# TODO(rkn): The trace2html script currently requires Python 2.
|
||||
# Remove this dependency.
|
||||
subprocess.check_output(["python2",
|
||||
trace2html,
|
||||
tmp,
|
||||
"--output",
|
||||
tmp2])
|
||||
# Open the timeline in Chrome. TODO(rkn): We should remove the
|
||||
# dependency on Chrome and use whatever browser is currently being
|
||||
# used. Note that this currently does not work when Ray is being
|
||||
# used on a cluster and the browser is running locally.
|
||||
print("Opening html file in browser...")
|
||||
subprocess.Popen(["open", "-a", "Google Chrome", tmp2])
|
||||
|
||||
path_input.on_click(handle_submit)
|
||||
|
||||
|
||||
def task_completion_time_distribution():
|
||||
from bokeh.models import ColumnDataSource
|
||||
from bokeh.layouts import gridplot
|
||||
from bokeh.plotting import figure, show, helpers
|
||||
from bokeh.io import output_notebook, push_notebook
|
||||
from bokeh.resources import CDN
|
||||
output_notebook(resources=CDN)
|
||||
|
||||
# Create the Bokeh plot
|
||||
p = figure(title="Task Completion Time Distribution",
|
||||
tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"],
|
||||
background_fill_color="#FFFFFF",
|
||||
x_range=(0, 1),
|
||||
y_range=(0, 1))
|
||||
|
||||
# Create the data source that the plot pulls from
|
||||
source = ColumnDataSource(data={
|
||||
"top": [],
|
||||
"left": [],
|
||||
"right": []
|
||||
})
|
||||
|
||||
# Plot the histogram rectangles
|
||||
p.quad(top="top", bottom=0, left="left", right="right", source=source,
|
||||
fill_color="#B3B3B3", line_color="#033649")
|
||||
|
||||
# Label the plot axes
|
||||
p.xaxis.axis_label = "Duration in seconds"
|
||||
p.yaxis.axis_label = "Number of tasks"
|
||||
|
||||
handle = show(gridplot(p, ncols=1,
|
||||
plot_width=500,
|
||||
plot_height=500,
|
||||
toolbar_location="below"), notebook_handle=True)
|
||||
|
||||
# Function to update the plot
|
||||
def task_completion_time_update(abs_earliest,
|
||||
abs_latest,
|
||||
abs_num_tasks,
|
||||
tasks):
|
||||
if len(tasks) == 0:
|
||||
return
|
||||
|
||||
# Create the distribution to plot
|
||||
distr = []
|
||||
for task_id, data in tasks.items():
|
||||
distr.append(data["store_outputs_end"] -
|
||||
data["get_arguments_start"])
|
||||
|
||||
# Create a histogram from the distribution
|
||||
top, bin_edges = np.histogram(distr, bins="auto")
|
||||
left = bin_edges[:-1]
|
||||
right = bin_edges[1:]
|
||||
|
||||
source.data = {"top": top, "left": left, "right": right}
|
||||
|
||||
# Set the x and y ranges
|
||||
x_range = (min(left) if len(left) else 0,
|
||||
max(right) if len(right) else 1)
|
||||
y_range = (0, max(top) + 1 if len(top) else 1)
|
||||
|
||||
x_range = helpers._get_range(x_range)
|
||||
p.x_range.start = x_range.start
|
||||
p.x_range.end = x_range.end
|
||||
|
||||
y_range = helpers._get_range(y_range)
|
||||
p.y_range.start = y_range.start
|
||||
p.y_range.end = y_range.end
|
||||
|
||||
# Push updates to the plot
|
||||
push_notebook(handle=handle)
|
||||
|
||||
get_sliders(task_completion_time_update)
|
||||
|
||||
|
||||
def compute_utilizations(abs_earliest,
|
||||
abs_latest,
|
||||
num_tasks,
|
||||
tasks,
|
||||
num_buckets,
|
||||
use_abs_times=False):
|
||||
if len(tasks) == 0:
|
||||
return [], [], []
|
||||
|
||||
if use_abs_times:
|
||||
earliest_time = abs_earliest
|
||||
latest_time = abs_latest
|
||||
else:
|
||||
# Determine what the earliest and latest tasks are out of the ones
|
||||
# that are passed in
|
||||
earliest_time = time.time()
|
||||
latest_time = 0
|
||||
for task_id, data in tasks.items():
|
||||
latest_time = max((latest_time, data["store_outputs_end"]))
|
||||
earliest_time = min((earliest_time,
|
||||
data["get_arguments_start"]))
|
||||
|
||||
# Add some epsilon to latest_time to ensure that the end time of the
|
||||
# last task falls __within__ a bucket, and not on the edge
|
||||
latest_time += 1e-6
|
||||
|
||||
# Compute average CPU utilization per time bucket by summing
|
||||
# cpu-time per bucket
|
||||
bucket_time_length = (latest_time - earliest_time) / float(num_buckets)
|
||||
cpu_time = [0 for _ in range(num_buckets)]
|
||||
|
||||
for data in tasks.values():
|
||||
task_start_time = data["get_arguments_start"]
|
||||
task_end_time = data["store_outputs_end"]
|
||||
|
||||
start_bucket = int((task_start_time - earliest_time)
|
||||
/ bucket_time_length)
|
||||
end_bucket = int((task_end_time - earliest_time)
|
||||
/ bucket_time_length)
|
||||
# Walk over each time bucket that this task intersects, adding the
|
||||
# amount of time that the task intersects within each bucket
|
||||
for bucket_idx in range(start_bucket, end_bucket + 1):
|
||||
bucket_start_time = (earliest_time + bucket_idx
|
||||
* bucket_time_length)
|
||||
bucket_end_time = (earliest_time + (bucket_idx + 1)
|
||||
* bucket_time_length)
|
||||
|
||||
task_start_time_within_bucket = max(task_start_time,
|
||||
bucket_start_time)
|
||||
task_end_time_within_bucket = min(task_end_time,
|
||||
bucket_end_time)
|
||||
task_cpu_time_within_bucket = (task_end_time_within_bucket -
|
||||
task_start_time_within_bucket)
|
||||
|
||||
if bucket_idx > -1 and bucket_idx < num_buckets:
|
||||
cpu_time[bucket_idx] += task_cpu_time_within_bucket
|
||||
|
||||
# Cpu_utilization is the average cpu utilization of the bucket, which
|
||||
# is just cpu_time divided by bucket_time_length.
|
||||
cpu_utilization = list(map(lambda x: x / float(bucket_time_length),
|
||||
cpu_time))
|
||||
|
||||
# Generate histogram bucket edges. Subtract out abs_earliest to get
|
||||
# relative time.
|
||||
all_edges = [earliest_time - abs_earliest + i * bucket_time_length
|
||||
for i in range(num_buckets + 1)]
|
||||
# Left edges are all but the rightmost edge, right edges are all but
|
||||
# the leftmost edge.
|
||||
left_edges = all_edges[:-1]
|
||||
right_edges = all_edges[1:]
|
||||
|
||||
return left_edges, right_edges, cpu_utilization
|
||||
|
||||
|
||||
def cpu_usage():
|
||||
from bokeh.layouts import gridplot
|
||||
from bokeh.plotting import figure, show, helpers
|
||||
from bokeh.resources import CDN
|
||||
from bokeh.io import output_notebook, push_notebook
|
||||
from bokeh.models import ColumnDataSource
|
||||
output_notebook(resources=CDN)
|
||||
|
||||
# Parse the client table to determine how many CPUs are available
|
||||
num_cpus = 0
|
||||
client_table = ray.global_state.client_table()
|
||||
for node_ip, client_list in client_table.items():
|
||||
for client in client_list:
|
||||
if "NumCPUs" in client:
|
||||
num_cpus += client["NumCPUs"]
|
||||
|
||||
# Update the plot based on the sliders
|
||||
def plot_utilization():
|
||||
# Create the Bokeh plot
|
||||
time_series_fig = figure(title="CPU Utilization",
|
||||
tools=["save", "hover", "wheel_zoom",
|
||||
"box_zoom", "pan"],
|
||||
background_fill_color="#FFFFFF",
|
||||
x_range=[0, 1],
|
||||
y_range=[0, 1])
|
||||
|
||||
# Create the data source that the plot will pull from
|
||||
time_series_source = ColumnDataSource(data=dict(
|
||||
left=[],
|
||||
right=[],
|
||||
top=[]
|
||||
))
|
||||
|
||||
# Plot the rectangles representing the distribution
|
||||
time_series_fig.quad(left="left",
|
||||
right="right",
|
||||
top="top",
|
||||
bottom=0,
|
||||
source=time_series_source,
|
||||
fill_color="#B3B3B3",
|
||||
line_color="#033649")
|
||||
|
||||
# Label the plot axes
|
||||
time_series_fig.xaxis.axis_label = "Time in seconds"
|
||||
time_series_fig.yaxis.axis_label = "Number of CPUs used"
|
||||
|
||||
handle = show(gridplot(time_series_fig,
|
||||
ncols=1,
|
||||
plot_width=500,
|
||||
plot_height=500,
|
||||
toolbar_location="below"), notebook_handle=True)
|
||||
|
||||
def update_plot(abs_earliest, abs_latest, abs_num_tasks, tasks):
|
||||
num_buckets = 100
|
||||
left, right, top = compute_utilizations(abs_earliest,
|
||||
abs_latest,
|
||||
abs_num_tasks,
|
||||
tasks,
|
||||
num_buckets)
|
||||
|
||||
time_series_source.data = {"left": left,
|
||||
"right": right,
|
||||
"top": top}
|
||||
|
||||
x_range = (max(0, min(left))
|
||||
if len(left) else 0,
|
||||
max(right) if len(right) else 1)
|
||||
y_range = (0, max(top) + 1 if len(top) else 1)
|
||||
|
||||
# Define the axis ranges
|
||||
x_range = helpers._get_range(x_range)
|
||||
time_series_fig.x_range.start = x_range.start
|
||||
time_series_fig.x_range.end = x_range.end
|
||||
|
||||
y_range = helpers._get_range(y_range)
|
||||
time_series_fig.y_range.start = y_range.start
|
||||
time_series_fig.y_range.end = num_cpus
|
||||
|
||||
# Push the updated data to the notebook
|
||||
push_notebook(handle=handle)
|
||||
|
||||
get_sliders(update_plot)
|
||||
plot_utilization()
|
||||
|
||||
|
||||
# Function to create the cluster usage "heat map"
|
||||
def cluster_usage():
|
||||
from bokeh.io import show, output_notebook, push_notebook
|
||||
from bokeh.resources import CDN
|
||||
from bokeh.plotting import figure
|
||||
from bokeh.models import (
|
||||
ColumnDataSource,
|
||||
HoverTool,
|
||||
LinearColorMapper,
|
||||
BasicTicker,
|
||||
ColorBar,
|
||||
)
|
||||
output_notebook(resources=CDN)
|
||||
|
||||
# Initial values
|
||||
source = ColumnDataSource(data={"node_ip_address": ['127.0.0.1'],
|
||||
"time": ['0.5'],
|
||||
"num_tasks": ['1'],
|
||||
"length": [1]})
|
||||
|
||||
# Define the color schema
|
||||
colors = ["#75968f",
|
||||
"#a5bab7",
|
||||
"#c9d9d3",
|
||||
"#e2e2e2",
|
||||
"#dfccce",
|
||||
"#ddb7b1",
|
||||
"#cc7878",
|
||||
"#933b41",
|
||||
"#550b1d"]
|
||||
mapper = LinearColorMapper(palette=colors, low=0, high=2)
|
||||
|
||||
TOOLS = "hover, save, xpan, box_zoom, reset, xwheel_zoom"
|
||||
|
||||
# Create the plot
|
||||
p = figure(title="Cluster Usage",
|
||||
y_range=list(set(source.data['node_ip_address'])),
|
||||
x_axis_location="above",
|
||||
plot_width=900,
|
||||
plot_height=500,
|
||||
tools=TOOLS,
|
||||
toolbar_location='below')
|
||||
|
||||
# Format the plot axes
|
||||
p.grid.grid_line_color = None
|
||||
p.axis.axis_line_color = None
|
||||
p.axis.major_tick_line_color = None
|
||||
p.axis.major_label_text_font_size = "10pt"
|
||||
p.axis.major_label_standoff = 0
|
||||
p.xaxis.major_label_orientation = np.pi / 3
|
||||
|
||||
# Plot rectangles
|
||||
p.rect(x="time", y="node_ip_address", width="length", height=1,
|
||||
source=source,
|
||||
fill_color={"field": "num_tasks", "transform": mapper},
|
||||
line_color=None)
|
||||
|
||||
# Add legend to the side of the plot
|
||||
color_bar = ColorBar(color_mapper=mapper,
|
||||
major_label_text_font_size="8pt",
|
||||
ticker=BasicTicker(desired_num_ticks=len(colors)),
|
||||
label_standoff=6,
|
||||
border_line_color=None,
|
||||
location=(0, 0))
|
||||
p.add_layout(color_bar, "right")
|
||||
|
||||
# Define hover tool
|
||||
p.select_one(HoverTool).tooltips = [
|
||||
("Node IP Address", "@node_ip_address"),
|
||||
("Number of tasks running", "@num_tasks"),
|
||||
("Time", "@time")
|
||||
]
|
||||
|
||||
# Define the axis labels
|
||||
p.xaxis.axis_label = "Time in seconds"
|
||||
p.yaxis.axis_label = "Node IP Address"
|
||||
handle = show(p, notebook_handle=True)
|
||||
workers = ray.global_state.workers()
|
||||
|
||||
# Function to update the heat map
|
||||
def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
|
||||
if len(tasks) == 0:
|
||||
return
|
||||
|
||||
earliest = time.time()
|
||||
latest = 0
|
||||
|
||||
node_to_tasks = dict()
|
||||
# Determine which task has the earlest start time out of the ones
|
||||
# passed into the update function
|
||||
for task_id, data in tasks.items():
|
||||
if data["score"] > latest:
|
||||
latest = data["score"]
|
||||
if data["score"] < earliest:
|
||||
earliest = data["score"]
|
||||
worker_id = data["worker_id"]
|
||||
node_ip = workers[worker_id]["node_ip_address"]
|
||||
if node_ip not in node_to_tasks:
|
||||
node_to_tasks[node_ip] = {}
|
||||
node_to_tasks[node_ip][task_id] = data
|
||||
|
||||
nodes = []
|
||||
times = []
|
||||
lengths = []
|
||||
num_tasks = []
|
||||
|
||||
for node_ip, task_dict in node_to_tasks.items():
|
||||
left, right, top = compute_utilizations(earliest,
|
||||
latest,
|
||||
abs_num_tasks,
|
||||
task_dict,
|
||||
100,
|
||||
True)
|
||||
for (l, r, t) in zip(left, right, top):
|
||||
nodes.append(node_ip)
|
||||
times.append((l + r) / 2)
|
||||
lengths.append(r - l)
|
||||
num_tasks.append(t)
|
||||
|
||||
# Set the y range of the plot to be the node IP addresses
|
||||
p.y_range.factors = list(set(nodes))
|
||||
|
||||
mapper.low = min(min(num_tasks), 0)
|
||||
mapper.high = max(max(num_tasks), 1)
|
||||
|
||||
# Update plot with new data based on slider and text box values
|
||||
source.data = {"node_ip_address": nodes,
|
||||
"time": times,
|
||||
"num_tasks": num_tasks,
|
||||
"length": lengths}
|
||||
|
||||
push_notebook(handle=handle)
|
||||
|
||||
get_sliders(heat_map_update)
|
||||
Reference in New Issue
Block a user