Task duration distribution plot. (#743)

* Task duration distribution plot.

* Fixed bug.

* Changed axis labels.

* Modify task start point.

* Modified task_profiles func to decode in ascii.

* Nvm

* Changed to double quotes and added comments.

* fixed linting

* Fixed linting.

* Fixed bug.
This commit is contained in:
alanamarzoev
2017-07-19 23:15:17 -07:00
committed by Robert Nishihara
parent d356dd3ec4
commit 853b2913b7
+134 -57
View File
@@ -9,14 +9,16 @@
"outputs": [],
"source": [
"import ipywidgets as widgets\n",
"import numpy as np\n",
"import os\n",
"import ray\n",
"import subprocess\n",
"import tempfile\n",
"import time\n",
"\n",
"from IPython.display import display\n",
"\n",
"ray.init(redis_address=os.environ['REDIS_ADDRESS'])"
"ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])"
]
},
{
@@ -37,9 +39,9 @@
"outputs": [],
"source": [
"object_search = widgets.Text(\n",
" value='',\n",
" placeholder='Object ID',\n",
" description='Search for an object:',\n",
" value=\"\",\n",
" placeholder=\"Object ID\",\n",
" description=\"Search for an object:\",\n",
" disabled=False\n",
")\n",
"display(object_search)\n",
@@ -67,9 +69,9 @@
"outputs": [],
"source": [
"task_search = widgets.Text(\n",
" value='',\n",
" placeholder='Task ID',\n",
" description='Search for a task:',\n",
" value=\"\",\n",
" placeholder=\"Task ID\",\n",
" description=\"Search for a task:\",\n",
" disabled=False\n",
")\n",
"display(task_search)\n",
@@ -103,20 +105,20 @@
" def __exit__(self, *args):\n",
" self.should_recurse = True\n",
"\n",
"total_time_value = '% total time'\n",
"total_tasks_value = '% total tasks'\n",
"total_time_value = \"% total time\"\n",
"total_tasks_value = \"% total tasks\"\n",
"\n",
"# Function that returns instances of sliders and handles associated events.\n",
"def get_sliders(update):\n",
" # Start_box value indicates the desired start point of queried window.\n",
" start_box = widgets.FloatText(\n",
" description='Start Time:',\n",
" description=\"Start Time:\",\n",
" disabled=True,\n",
" )\n",
" \n",
" # End_box value indicates the desired end point of queried window.\n",
" end_box = widgets.FloatText(\n",
" description='End Time:',\n",
" description=\"End Time:\",\n",
" disabled=True,\n",
" )\n",
" \n",
@@ -127,17 +129,17 @@
" min=0,\n",
" max=100,\n",
" step=1,\n",
" description='%:',\n",
" description=\"%:\",\n",
" continuous_update=False,\n",
" orientation='horizontal',\n",
" orientation=\"horizontal\",\n",
" readout=True,\n",
" readout_format='.0i%',\n",
" readout_format=\".0i%\",\n",
" )\n",
" \n",
" # Indicates the number of tasks that the user wants to be returned. Is\n",
" # disabled when the breakdown_opt value is set to total_time_value. \n",
" num_tasks_box = widgets.IntText(\n",
" description='Num Tasks:',\n",
" description=\"Num Tasks:\",\n",
" disabled=False\n",
" )\n",
" \n",
@@ -146,11 +148,11 @@
" breakdown_opt = widgets.Dropdown(\n",
" options=[total_time_value, total_tasks_value],\n",
" value=total_tasks_value,\n",
" description='Selection Options:'\n",
" description=\"Selection Options:\"\n",
" )\n",
" \n",
" # Initially passed in to the update_wrapper function.\n",
" INIT_EVENT = 'INIT'\n",
" INIT_EVENT = \"INIT\"\n",
" \n",
" # Create instance of context manager to determine whether callback is\n",
" # currently executing \n",
@@ -182,7 +184,7 @@
" end_box.value = round(diff * high, 2)\n",
" \n",
" # Event was triggered by a change in the start_box value. \n",
" elif event['owner'] == start_box:\n",
" elif event[\"owner\"] == start_box:\n",
" if start_box.value > end_box.value:\n",
" start_box.value = end_box.value\n",
" elif start_box.value < 0:\n",
@@ -191,7 +193,7 @@
" range_slider.value = (int((start_box.value * 100.) / diff), high)\n",
" \n",
" # Event was triggered by a change in the end_box value. \n",
" elif event['owner'] == end_box:\n",
" elif event[\"owner\"] == end_box:\n",
" if start_box.value > end_box.value:\n",
" end_box.value = start_box.value\n",
" elif end_box.value > diff:\n",
@@ -201,7 +203,7 @@
" \n",
" # Event was triggered by a change in the breakdown options\n",
" # toggle.\n",
" elif event['owner'] == breakdown_opt:\n",
" elif event[\"owner\"] == breakdown_opt:\n",
" if breakdown_opt.value == total_tasks_value:\n",
" start_box.disabled = True\n",
" end_box.disabled = True\n",
@@ -217,11 +219,11 @@
" \n",
" # Event was triggered by a change in the range_slider\n",
" # value.\n",
" elif event['owner'] == range_slider:\n",
" elif event[\"owner\"] == range_slider:\n",
" low, high = map(lambda x: x / 100., range_slider.value)\n",
" if breakdown_opt.value == total_tasks_value:\n",
" old_low, old_high = event['old']\n",
" new_low, new_high = event['new']\n",
" old_low, old_high = event[\"old\"]\n",
" new_low, new_high = event[\"new\"]\n",
" if old_low != new_low:\n",
" range_slider.value = (new_low, 100)\n",
" num_tasks_box.value = -(100. - new_low) / 100. * num_tasks\n",
@@ -234,16 +236,12 @@
" \n",
" # Event was triggered by a change in the num_tasks_box\n",
" # value.\n",
" elif event['owner'] == num_tasks_box:\n",
" elif event[\"owner\"] == num_tasks_box:\n",
" if num_tasks_box.value > 0:\n",
" range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))\n",
" elif num_tasks_box.value < 0:\n",
" range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)\n",
" \n",
" # This should never happen. \n",
" else:\n",
" raise ValueError('Unknown event owner!')\n",
"\n",
" \n",
" if not update:\n",
" return\n",
"\n",
@@ -266,20 +264,16 @@
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n",
" else:\n",
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
" \n",
" # This should never happen.\n",
" else:\n",
" raise ValueError('Value \"{}\" is not a legal breakdown value'.format(breakdown_opt.value))\n",
"\n",
" update(smallest, largest, num_tasks, tasks)\n",
" \n",
" # Get updated values from a slider or text box, and update the rest of\n",
" # them accordingly.\n",
" range_slider.observe(update_wrapper, names='value')\n",
" breakdown_opt.observe(update_wrapper, names='value')\n",
" start_box.observe(update_wrapper, names='value')\n",
" end_box.observe(update_wrapper, names='value')\n",
" num_tasks_box.observe(update_wrapper, names='value')\n",
" range_slider.observe(update_wrapper, names=\"value\")\n",
" breakdown_opt.observe(update_wrapper, names=\"value\")\n",
" start_box.observe(update_wrapper, names=\"value\")\n",
" end_box.observe(update_wrapper, names=\"value\")\n",
" num_tasks_box.observe(update_wrapper, names=\"value\")\n",
" \n",
" # Initializes the sliders\n",
" update_wrapper(INIT_EVENT)\n",
@@ -300,15 +294,15 @@
"outputs": [],
"source": [
"def task_timeline():\n",
" path_input = widgets.Button(description='View task timeline')\n",
" path_input = widgets.Button(description=\"View task timeline\")\n",
"\n",
" breakdown_basic = 'Basic'\n",
" breakdown_task = 'Task Breakdowns'\n",
" breakdown_basic = \"Basic\"\n",
" breakdown_task = \"Task Breakdowns\"\n",
" \n",
" breakdown_opt = widgets.Dropdown(\n",
" options=['Basic', 'Task Breakdowns'],\n",
" value='Basic',\n",
" description='View options:',\n",
" options=[\"Basic\", \"Task Breakdowns\"],\n",
" value=\"Basic\",\n",
" description=\"View options:\",\n",
" disabled=False,\n",
" )\n",
" \n",
@@ -317,26 +311,26 @@
" display(path_input)\n",
"\n",
" def find_trace2html():\n",
" trace2html = '/tmp/ray/catapult/tracing/bin/trace2html'\n",
" trace2html = \"/tmp/ray/catapult/tracing/bin/trace2html\"\n",
" # Clone the catapult repository if it doesn't exist. TODO(rkn): We\n",
" # could do this in the build.sh script later on.\n",
" if not os.path.exists(trace2html): \n",
" cmd = ['git', 'clone', 'https://github.com/catapult-project/catapult.git', '/tmp/ray/catapult']\n",
" cmd = [\"git\", \"clone\", \"https://github.com/catapult-project/catapult.git\", \"/tmp/ray/catapult\"]\n",
" subprocess.check_output(cmd) \n",
" print('Cloning catapult to /tmp/ray/catapult.')\n",
" print(\"Cloning catapult to /tmp/ray/catapult.\")\n",
" assert os.path.exists(trace2html)\n",
" return trace2html\n",
"\n",
" def handle_submit(sender):\n",
" tmp = tempfile.mktemp() + '.json'\n",
" tmp2 = tempfile.mktemp() + '.html'\n",
" tmp = tempfile.mktemp() + \".json\"\n",
" tmp2 = tempfile.mktemp() + \".html\"\n",
" \n",
" if breakdown_opt.value == breakdown_basic:\n",
" breakdown = False\n",
" elif breakdown_opt.value == breakdown_task:\n",
" breakdown = True\n",
" else:\n",
" raise ValueError('Unexpected breakdown value \"{}\"'.format(breakdown_opt.value))\n",
" raise ValueError(\"Unexpected breakdown value '{}'\".format(breakdown_opt.value))\n",
" \n",
" low, high = map(lambda x: x / 100., range_slider.value)\n",
" \n",
@@ -351,27 +345,110 @@
" else:\n",
" tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n",
" else:\n",
" raise ValueError('Unexpected time value \"{}\"'.format(time_opt.value))\n",
" raise ValueError(\"Unexpected time value '{}'\".format(time_opt.value))\n",
" \n",
" print('{} tasks to trace'.format(len(tasks)))\n",
" print('Dumping task profiling data to ' + tmp)\n",
" print(\"{} tasks to trace\".format(len(tasks)))\n",
" print(\"Dumping task profiling data to \" + tmp)\n",
" ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)\n",
" print('Converting chrome trace to ' + tmp2)\n",
" print(\"Converting chrome trace to \" + tmp2)\n",
" trace2html = find_trace2html()\n",
" # TODO(rkn): The trace2html script currently requires Python 2.\n",
" # Remove this dependency.\n",
" subprocess.check_output(['python2', trace2html, tmp, '--output', tmp2])\n",
" subprocess.check_output([\"python2\", trace2html, tmp, \"--output\", tmp2])\n",
" # Open the timeline in Chrome. TODO(rkn): We should remove the\n",
" # dependency on Chrome and use whatever browser is currently being\n",
" # used. Note that this currently does not work when Ray is being\n",
" # used on a cluster and the browser is running locally.\n",
" print('Opening html file in browser...')\n",
" subprocess.Popen(['open', '-a', 'Google Chrome', tmp2])\n",
" print(\"Opening html file in browser...\")\n",
" subprocess.Popen([\"open\", \"-a\", \"Google Chrome\", tmp2])\n",
"\n",
" path_input.on_click(handle_submit)\n",
"\n",
"task_timeline()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Task duration distribution plot."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"collapsed": true
},
"outputs": [],
"source": [
"from bokeh.models import Range1d, ColumnDataSource\n",
"from bokeh.layouts import gridplot\n",
"from bokeh.plotting import figure, show, helpers\n",
"from bokeh.io import output_notebook, push_notebook\n",
"from bokeh.resources import CDN\n",
"output_notebook(resources=CDN)\n",
"\n",
"\n",
"def task_completion_time_distribution():\n",
" # Create the Bokeh plot \n",
" p = figure(title=\"Task Completion Time Distribution\",tools=[\"save\", \"hover\", \"wheel_zoom\", \"box_zoom\", \"pan\"],\n",
" background_fill_color=\"#FFFFFF\", x_range=(0, 1), y_range = (0, 1))\n",
" \n",
" # Create the data source that the plot pulls from \n",
" source = ColumnDataSource(data={\n",
" \"top\": [],\n",
" \"left\": [],\n",
" \"right\": []\n",
" })\n",
" \n",
" # Plot the histogram rectangles\n",
" p.quad(top=\"top\", bottom=0, left=\"left\", right=\"right\", source=source,\n",
" fill_color=\"#B3B3B3\", line_color=\"#033649\")\n",
"\n",
" # Label the plot axes \n",
" p.xaxis.axis_label = \"Duration in seconds\"\n",
" p.yaxis.axis_label = \"Number of tasks\"\n",
"\n",
" handle = show(gridplot(p, ncols=1, plot_width=500, plot_height=500, toolbar_location=\"below\"),\n",
" notebook_handle=True)\n",
"\n",
" # Function to update the plot \n",
" def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n",
" if len(tasks) == 0:\n",
" return\n",
" \n",
" # Create the distribution to plot\n",
" distr = []\n",
" for task_id, data in tasks.items():\n",
" distr.append(data[\"store_outputs_end\"] - data[\"get_arguments_start\"])\n",
" \n",
" # Create a histogram from the distribution \n",
" top, bin_edges = np.histogram(distr, bins=\"auto\")\n",
" left = bin_edges[:-1]\n",
" right = bin_edges[1:]\n",
"\n",
" source.data = {\"top\": top, \"left\": left, \"right\": right}\n",
" \n",
" # Set the x and y ranges \n",
" x_range = (min(left) if len(left) else 0, max(right) if len(right) else 1)\n",
" y_range = (0, max(top) + 1 if len(top) else 1)\n",
" \n",
" x_range = helpers._get_range(x_range)\n",
" p.x_range.start = x_range.start\n",
" p.x_range.end = x_range.end\n",
"\n",
" y_range = helpers._get_range(y_range)\n",
" p.y_range.start = y_range.start\n",
" p.y_range.end = y_range.end\n",
" \n",
" # Push updates to the plot\n",
" push_notebook(handle=handle)\n",
" \n",
" get_sliders(task_completion_time_update)\n",
"\n",
"task_completion_time_distribution()"
]
}
],
"metadata": {