diff --git a/python/ray/WebUI.ipynb b/python/ray/WebUI.ipynb index f3714a31c..0ba0b65b7 100644 --- a/python/ray/WebUI.ipynb +++ b/python/ray/WebUI.ipynb @@ -4,741 +4,118 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "### Ray UI.\n", + "# Ray UI\n", "\n", - "#### To run the UI: \n", - "1. Kernel -> Restart and Run All \n", - "\n", - "\n", - "#### To hide the code:\n", - "\n", - "1. Install the hide_code extension.\n", - "```\n", - "pip install hide_code\n", - "jupyter nbextension install --py hide_code\n", - "jupyter nbextension enable --py hide_code\n", - "jupyter serverextension enable --py hide_code\n", - "```\n", - "2. Go to View -> Cell Toolbar -> Hide Code. \n", - "3. Press ESC then T" + "Start the UI with **Kernel -> Restart and Run All**." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "import ipywidgets as widgets\n", - "import numpy as np\n", "import os\n", - "import pprint\n", "import ray\n", - "import subprocess\n", - "import tempfile\n", - "import time\n", - "\n", - "from IPython.display import display\n", + "import ray.experimental.ui as ui\n", "\n", "ray.init(redis_address=os.environ[\"REDIS_ADDRESS\"])" ] }, { "cell_type": "markdown", - "metadata": { - "collapsed": true - }, + "metadata": {}, "source": [ - "#### Evaluate the box below to search for objects." + "#### Object search." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "object_search = widgets.Text(\n", - " value=\"\",\n", - " placeholder=\"Object ID\",\n", - " description=\"Search for an object:\",\n", - " disabled=False\n", - ")\n", - "display(object_search)\n", - "\n", - "def handle_submit(sender):\n", - " pp = pprint.PrettyPrinter()\n", - " pp.pprint(ray.global_state.object_table(object_search.value))\n", - "\n", - "object_search.on_submit(handle_submit)" + "ui.object_search_bar()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### Evaluate the box below to search for tasks." + "#### Task search." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "task_search = widgets.Text(\n", - " value=\"\",\n", - " placeholder=\"Task ID\",\n", - " description=\"Search for a task:\",\n", - " disabled=False\n", - ")\n", - "display(task_search)\n", - "\n", - "def handle_submit(sender):\n", - " pp = pprint.PrettyPrinter()\n", - " pp.pprint(ray.global_state.task_table(task_search.value))\n", - "\n", - "task_search.on_submit(handle_submit)" - ] - }, - { - "cell_type": "code", - "execution_count": 1, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "# Instances of this class maintains keep track of whether or not a\n", - "# callback is currently executing. Since the execution of the callback\n", - "# may trigger more calls to the callback, this is used to prevent infinite\n", - "# recursions.\n", - "class _EventRecursionContextManager(object):\n", - " def __init__(self):\n", - " self.should_recurse = True\n", - " \n", - " def __enter__(self):\n", - " self.should_recurse = False\n", - " \n", - " def __exit__(self, *args):\n", - " self.should_recurse = True\n", - "\n", - "total_time_value = \"% total time\"\n", - "total_tasks_value = \"% total tasks\"\n", - "\n", - "# Function that returns instances of sliders and handles associated events.\n", - "def get_sliders(update):\n", - " # Start_box value indicates the desired start point of queried window.\n", - " start_box = widgets.FloatText(\n", - " description=\"Start Time:\",\n", - " disabled=True,\n", - " )\n", - " \n", - " # End_box value indicates the desired end point of queried window.\n", - " end_box = widgets.FloatText(\n", - " description=\"End Time:\",\n", - " disabled=True,\n", - " )\n", - " \n", - " # Percentage slider. Indicates either % of total time or total tasks\n", - " # depending on what breakdown_opt is set to.\n", - " range_slider = widgets.IntRangeSlider(\n", - " value=[70, 100],\n", - " min=0,\n", - " max=100,\n", - " step=1,\n", - " description=\"%:\",\n", - " continuous_update=False,\n", - " orientation=\"horizontal\",\n", - " readout=True,\n", - " readout_format=\".0i%\",\n", - " )\n", - " \n", - " # Indicates the number of tasks that the user wants to be returned. Is\n", - " # disabled when the breakdown_opt value is set to total_time_value. \n", - " num_tasks_box = widgets.IntText(\n", - " description=\"Num Tasks:\",\n", - " disabled=False\n", - " )\n", - " \n", - " # Dropdown bar that lets the user choose between modifying % of total\n", - " # time or total number of tasks.\n", - " breakdown_opt = widgets.Dropdown(\n", - " options=[total_time_value, total_tasks_value],\n", - " value=total_tasks_value,\n", - " description=\"Selection Options:\"\n", - " )\n", - " \n", - " # Initially passed in to the update_wrapper function.\n", - " INIT_EVENT = \"INIT\"\n", - " \n", - " # Create instance of context manager to determine whether callback is\n", - " # currently executing \n", - " out_recursion = _EventRecursionContextManager()\n", - " \n", - " def update_wrapper(event):\n", - " # Feature received a callback, but it shouldn't be executed\n", - " # because the callback was the result of a different feature\n", - " # executing its callback based on user input. \n", - " if not out_recursion.should_recurse:\n", - " return\n", - " \n", - " # Feature received a callback and it should be executed because\n", - " # the callback was the result of user input. \n", - " with out_recursion:\n", - " smallest, largest, num_tasks = ray.global_state._job_length()\n", - " diff = largest - smallest\n", - " if num_tasks is not 0: \n", - " \n", - " # Describes the initial values that the slider/text box\n", - " # values should be set to.\n", - " if event == INIT_EVENT:\n", - " if breakdown_opt.value == total_tasks_value:\n", - " num_tasks_box.value = -min(10000, num_tasks)\n", - " range_slider.value = (int(100 - (100. * -num_tasks_box.value) / num_tasks), 100)\n", - " else:\n", - " low, high = map(lambda x: x / 100., range_slider.value)\n", - " start_box.value = round(diff * low, 2)\n", - " end_box.value = round(diff * high, 2)\n", - " \n", - " # Event was triggered by a change in the start_box value. \n", - " elif event[\"owner\"] == start_box:\n", - " if start_box.value > end_box.value:\n", - " start_box.value = end_box.value\n", - " elif start_box.value < 0:\n", - " start_box.value = 0\n", - " low, high = range_slider.value\n", - " range_slider.value = (int((start_box.value * 100.) / diff), high)\n", - " \n", - " # Event was triggered by a change in the end_box value. \n", - " elif event[\"owner\"] == end_box:\n", - " if start_box.value > end_box.value:\n", - " end_box.value = start_box.value\n", - " elif end_box.value > diff:\n", - " end_box.value = diff\n", - " low, high = range_slider.value\n", - " range_slider.value = (low, int((end_box.value * 100.) / diff))\n", - " \n", - " # Event was triggered by a change in the breakdown options\n", - " # toggle.\n", - " elif event[\"owner\"] == breakdown_opt:\n", - " if breakdown_opt.value == total_tasks_value:\n", - " start_box.disabled = True\n", - " end_box.disabled = True\n", - " num_tasks_box.disabled = False\n", - " num_tasks_box.value = min(10000, num_tasks)\n", - " range_slider.value = (int(100 - (100. * num_tasks_box.value) / num_tasks), 100)\n", - " else:\n", - " start_box.disabled = False\n", - " end_box.disabled = False\n", - " num_tasks_box.disabled = True\n", - " range_slider.value = (int((start_box.value * 100.) / diff),\n", - " int((end_box.value * 100.) / diff))\n", - " \n", - " # Event was triggered by a change in the range_slider\n", - " # value.\n", - " elif event[\"owner\"] == range_slider:\n", - " low, high = map(lambda x: x / 100., range_slider.value)\n", - " if breakdown_opt.value == total_tasks_value:\n", - " old_low, old_high = event[\"old\"]\n", - " new_low, new_high = event[\"new\"]\n", - " if old_low != new_low:\n", - " range_slider.value = (new_low, 100)\n", - " num_tasks_box.value = -(100. - new_low) / 100. * num_tasks\n", - " else:\n", - " range_slider.value = (0, new_high)\n", - " num_tasks_box.value = new_high / 100. * num_tasks\n", - " else:\n", - " start_box.value = round(diff * low, 2)\n", - " end_box.value = round(diff * high, 2)\n", - " \n", - " # Event was triggered by a change in the num_tasks_box\n", - " # value.\n", - " elif event[\"owner\"] == num_tasks_box:\n", - " if num_tasks_box.value > 0:\n", - " range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))\n", - " elif num_tasks_box.value < 0:\n", - " range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)\n", - " \n", - " if not update:\n", - " return\n", - "\n", - " diff = largest - smallest\n", - " \n", - " # Low and high are used to scale the times that are\n", - " # queried to be relative to the absolute time. \n", - " low, high = map(lambda x: x / 100., range_slider.value)\n", - " \n", - " # Queries to task_profiles based on the slider and text\n", - " # box values.\n", - " # (Querying based on the % total amount of time.) \n", - " if breakdown_opt.value == total_time_value:\n", - " tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n", - " \n", - " # (Querying based on % of total number of tasks that were\n", - " # run.)\n", - " elif breakdown_opt.value == total_tasks_value:\n", - " if range_slider.value[0] == 0:\n", - " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n", - " else:\n", - " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n", - "\n", - " update(smallest, largest, num_tasks, tasks)\n", - " \n", - " # Get updated values from a slider or text box, and update the rest of\n", - " # them accordingly.\n", - " range_slider.observe(update_wrapper, names=\"value\")\n", - " breakdown_opt.observe(update_wrapper, names=\"value\")\n", - " start_box.observe(update_wrapper, names=\"value\")\n", - " end_box.observe(update_wrapper, names=\"value\")\n", - " num_tasks_box.observe(update_wrapper, names=\"value\")\n", - " \n", - " # Initializes the sliders\n", - " update_wrapper(INIT_EVENT)\n", - " \n", - " # Display sliders and search boxes \n", - " display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)\n", - " \n", - " # Return the sliders and text boxes \n", - " return start_box, end_box, range_slider, breakdown_opt" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "def task_timeline():\n", - " path_input = widgets.Button(description=\"View task timeline\")\n", - "\n", - " breakdown_basic = \"Basic\"\n", - " breakdown_task = \"Task Breakdowns\"\n", - " \n", - " breakdown_opt = widgets.Dropdown(\n", - " options=[\"Basic\", \"Task Breakdowns\"],\n", - " value=\"Basic\",\n", - " description=\"View options:\",\n", - " disabled=False,\n", - " )\n", - " \n", - " start_box, end_box, range_slider, time_opt = get_sliders(False)\n", - " display(breakdown_opt)\n", - " display(path_input)\n", - "\n", - " def find_trace2html():\n", - " trace2html = \"/tmp/ray/catapult/tracing/bin/trace2html\"\n", - " # Clone the catapult repository if it doesn't exist. TODO(rkn): We\n", - " # could do this in the build.sh script later on.\n", - " if not os.path.exists(trace2html): \n", - " cmd = [\"git\", \"clone\", \"https://github.com/catapult-project/catapult.git\", \"/tmp/ray/catapult\"]\n", - " subprocess.check_output(cmd) \n", - " print(\"Cloning catapult to /tmp/ray/catapult.\")\n", - " assert os.path.exists(trace2html)\n", - " return trace2html\n", - "\n", - " def handle_submit(sender):\n", - " tmp = tempfile.mktemp() + \".json\"\n", - " tmp2 = tempfile.mktemp() + \".html\"\n", - " \n", - " if breakdown_opt.value == breakdown_basic:\n", - " breakdown = False\n", - " elif breakdown_opt.value == breakdown_task:\n", - " breakdown = True\n", - " else:\n", - " raise ValueError(\"Unexpected breakdown value '{}'\".format(breakdown_opt.value))\n", - " \n", - " low, high = map(lambda x: x / 100., range_slider.value)\n", - " \n", - " smallest, largest, num_tasks = ray.global_state._job_length()\n", - " diff = largest - smallest\n", - "\n", - " if time_opt.value == total_time_value:\n", - " tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)\n", - " elif time_opt.value == total_tasks_value:\n", - " if range_slider.value[0] == 0:\n", - " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * high), fwd=True)\n", - " else:\n", - " tasks = ray.global_state.task_profiles(num_tasks=int(num_tasks * (high - low)), fwd=False)\n", - " else:\n", - " raise ValueError(\"Unexpected time value '{}'\".format(time_opt.value))\n", - " \n", - " print(\"{} tasks to trace\".format(len(tasks)))\n", - " print(\"Dumping task profiling data to \" + tmp)\n", - " ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)\n", - " print(\"Converting chrome trace to \" + tmp2)\n", - " trace2html = find_trace2html()\n", - " # TODO(rkn): The trace2html script currently requires Python 2.\n", - " # Remove this dependency.\n", - " subprocess.check_output([\"python2\", trace2html, tmp, \"--output\", tmp2])\n", - " # Open the timeline in Chrome. TODO(rkn): We should remove the\n", - " # dependency on Chrome and use whatever browser is currently being\n", - " # used. Note that this currently does not work when Ray is being\n", - " # used on a cluster and the browser is running locally.\n", - " print(\"Opening html file in browser...\")\n", - " subprocess.Popen([\"open\", \"-a\", \"Google Chrome\", tmp2])\n", - "\n", - " path_input.on_click(handle_submit)\n", - "\n", - "task_timeline()" + "ui.task_search_bar()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### Task duration distribution plot." + "#### Task trace timeline." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "from bokeh.models import Range1d, ColumnDataSource\n", - "from bokeh.layouts import gridplot\n", - "from bokeh.plotting import figure, show, helpers\n", - "from bokeh.io import output_notebook, push_notebook\n", - "from bokeh.resources import CDN\n", - "output_notebook(resources=CDN)\n", - "\n", - "\n", - "def task_completion_time_distribution():\n", - " # Create the Bokeh plot \n", - " p = figure(title=\"Task Completion Time Distribution\",tools=[\"save\", \"hover\", \"wheel_zoom\", \"box_zoom\", \"pan\"],\n", - " background_fill_color=\"#FFFFFF\", x_range=(0, 1), y_range = (0, 1))\n", - " \n", - " # Create the data source that the plot pulls from \n", - " source = ColumnDataSource(data={\n", - " \"top\": [],\n", - " \"left\": [],\n", - " \"right\": []\n", - " })\n", - " \n", - " # Plot the histogram rectangles\n", - " p.quad(top=\"top\", bottom=0, left=\"left\", right=\"right\", source=source,\n", - " fill_color=\"#B3B3B3\", line_color=\"#033649\")\n", - "\n", - " # Label the plot axes \n", - " p.xaxis.axis_label = \"Duration in seconds\"\n", - " p.yaxis.axis_label = \"Number of tasks\"\n", - "\n", - " handle = show(gridplot(p, ncols=1, plot_width=500, plot_height=500, toolbar_location=\"below\"),\n", - " notebook_handle=True)\n", - "\n", - " # Function to update the plot \n", - " def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n", - " if len(tasks) == 0:\n", - " return\n", - " \n", - " # Create the distribution to plot\n", - " distr = []\n", - " for task_id, data in tasks.items():\n", - " distr.append(data[\"store_outputs_end\"] - data[\"get_arguments_start\"])\n", - " \n", - " # Create a histogram from the distribution \n", - " top, bin_edges = np.histogram(distr, bins=\"auto\")\n", - " left = bin_edges[:-1]\n", - " right = bin_edges[1:]\n", - "\n", - " source.data = {\"top\": top, \"left\": left, \"right\": right}\n", - " \n", - " # Set the x and y ranges \n", - " x_range = (min(left) if len(left) else 0, max(right) if len(right) else 1)\n", - " y_range = (0, max(top) + 1 if len(top) else 1)\n", - " \n", - " x_range = helpers._get_range(x_range)\n", - " p.x_range.start = x_range.start\n", - " p.x_range.end = x_range.end\n", - "\n", - " y_range = helpers._get_range(y_range)\n", - " p.y_range.start = y_range.start\n", - " p.y_range.end = y_range.end\n", - " \n", - " # Push updates to the plot\n", - " push_notebook(handle=handle)\n", - " \n", - " get_sliders(task_completion_time_update)\n", - "\n", - "task_completion_time_distribution()" + "ui.task_timeline()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### CPU usage over time plot." + "#### Task durations." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "from bokeh.layouts import gridplot\n", - "from bokeh.plotting import figure, show, helpers\n", - "from bokeh.resources import CDN\n", - "from bokeh.io import output_notebook, push_notebook\n", - "from bokeh.models import Range1d, ColumnDataSource\n", - "import numpy as np\n", - "output_notebook(resources=CDN)\n", - " \n", - "# Parse the client table to determine how many CPUs are available\n", - "num_cpus = 0 \n", - "client_table = ray.global_state.client_table()\n", - "for node_ip, client_list in client_table.items(): \n", - " for client in client_list: \n", - " if \"NumCPUs\" in client: \n", - " num_cpus += client[\"NumCPUs\"]\n", - "\n", - "def compute_utilizations(abs_earliest, abs_latest, num_tasks, tasks, num_buckets, use_abs_times=False):\n", - " \n", - " if len(tasks) == 0:\n", - " return [], [], []\n", - " \n", - " if use_abs_times:\n", - " earliest_time = abs_earliest\n", - " latest_time = abs_latest\n", - " else:\n", - " # Determine what the earliest and latest tasks are out of the ones that are passed in\n", - " earliest_time = time.time()\n", - " latest_time = 0\n", - " for task_id, data in tasks.items():\n", - " latest_time = max((latest_time, data[\"store_outputs_end\"]))\n", - " earliest_time = min((earliest_time, data[\"get_arguments_start\"]))\n", - " \n", - " # Add some epsilon to latest_time to ensure that the end time of the last task\n", - " # falls __within__ a bucket, and not on the edge\n", - " latest_time += 1e-6\n", - " \n", - " # Compute average CPU utilization per time bucket by summing cpu-time per bucket\n", - " bucket_time_length = (latest_time - earliest_time) / float(num_buckets)\n", - " cpu_time = [0 for _ in range(num_buckets)]\n", - " \n", - " for data in tasks.values():\n", - " task_start_time = data[\"get_arguments_start\"]\n", - " task_end_time = data[\"store_outputs_end\"]\n", - " \n", - " start_bucket = int((task_start_time - earliest_time) / bucket_time_length)\n", - " end_bucket = int((task_end_time - earliest_time) / bucket_time_length)\n", - " # Walk over each time bucket that this task intersects, adding the amount of\n", - " # time that the task intersects within each bucket\n", - " for bucket_idx in range(start_bucket, end_bucket + 1):\n", - " bucket_start_time = earliest_time + bucket_idx * bucket_time_length\n", - " bucket_end_time = earliest_time + (bucket_idx + 1) * bucket_time_length\n", - " \n", - " task_start_time_within_bucket = max(task_start_time, bucket_start_time)\n", - " task_end_time_within_bucket = min(task_end_time, bucket_end_time)\n", - " task_cpu_time_within_bucket = task_end_time_within_bucket - task_start_time_within_bucket\n", - " \n", - " if bucket_idx > -1 and bucket_idx < num_buckets: \n", - " cpu_time[bucket_idx] += task_cpu_time_within_bucket\n", - " \n", - " # Cpu_utilization is the average cpu utilization of the bucket, which is just\n", - " # cpu_time divided by bucket_time_length\n", - " cpu_utilization = list(map(lambda x: x / float(bucket_time_length), cpu_time))\n", - " \n", - " # Generate histogram bucket edges. Subtract out abs_earliest to get relative time\n", - " all_edges = [earliest_time - abs_earliest + i * bucket_time_length for i in range(num_buckets + 1)]\n", - " # Left edges are all but the rightmost edge, right edges are all but the leftmost edge\n", - " left_edges = all_edges[:-1]\n", - " right_edges = all_edges[1:]\n", - " \n", - " return left_edges, right_edges, cpu_utilization\n", - " \n", - "\n", - "# Update the plot based on the sliders\n", - "def plot_utilization():\n", - " # Create the Bokeh plot\n", - " time_series_fig = figure(title=\"CPU Utilization\",\n", - " tools=[\"save\", \"hover\", \"wheel_zoom\", \"box_zoom\", \"pan\"],\n", - " background_fill_color=\"#FFFFFF\", x_range=[0, 1], y_range=[0, 1])\n", - " \n", - " # Create the data source that the plot will pull from\n", - " time_series_source = ColumnDataSource(data=dict(\n", - " left=[],\n", - " right=[],\n", - " top=[]\n", - " ))\n", - " \n", - " # Plot the rectangles representing the distribution\n", - " time_series_fig.quad(left=\"left\", right=\"right\", top=\"top\", bottom=0,\n", - " source=time_series_source, fill_color=\"#B3B3B3\", line_color=\"#033649\")\n", - " \n", - " # Label the plot axes\n", - " time_series_fig.xaxis.axis_label = \"Time in seconds\"\n", - " time_series_fig.yaxis.axis_label = \"Number of CPUs used\"\n", - " \n", - " handle = show(gridplot(time_series_fig, ncols=1, plot_width=500, plot_height=500, toolbar_location=\"below\"),\n", - " notebook_handle=True)\n", - " \n", - " def update_plot(abs_earliest, abs_latest, abs_num_tasks, tasks):\n", - " num_buckets = 100\n", - " left, right, top = compute_utilizations(abs_earliest, abs_latest, abs_num_tasks, tasks, num_buckets)\n", - " \n", - " time_series_source.data = {\"left\": left, \"right\": right, \"top\": top}\n", - " \n", - " x_range = (max(0, min(left)) if len(left) else 0, max(right) if len(right) else 1)\n", - " y_range = (0, max(top) + 1 if len(top) else 1)\n", - " \n", - " # Define the axis ranges\n", - " x_range = helpers._get_range(x_range)\n", - " time_series_fig.x_range.start = x_range.start\n", - " time_series_fig.x_range.end = x_range.end\n", - " \n", - " y_range = helpers._get_range(y_range)\n", - " time_series_fig.y_range.start = y_range.start\n", - " time_series_fig.y_range.end = num_cpus\n", - " \n", - " # Push the updated data to the notebook\n", - " push_notebook(handle=handle)\n", - " \n", - " get_sliders(update_plot)\n", - "\n", - "plot_utilization()" + "ui.task_completion_time_distribution()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "#### Cluster usage over time \"heat map\"." + "#### CPU usage." ] }, { "cell_type": "code", "execution_count": null, - "metadata": { - "collapsed": true - }, + "metadata": {}, "outputs": [], "source": [ - "from bokeh.io import show, output_notebook, push_notebook\n", - "from bokeh.resources import CDN\n", - "from bokeh.plotting import figure, helpers\n", - "from bokeh.models import (\n", - " ColumnDataSource,\n", - " HoverTool,\n", - " LinearColorMapper,\n", - " BasicTicker,\n", - " PrintfTickFormatter,\n", - " ColorBar,\n", - ")\n", - "output_notebook(resources=CDN)\n", - "\n", - "# Function to create the cluster usage \"heat map\" \n", - "def cluster_usage():\n", - " \n", - " # Initial values \n", - " source = ColumnDataSource(data={\"node_ip_address\":['127.0.0.1'], \"time\":['0.5'], \"num_tasks\":['1'], \"length\": [1]})\n", - "\n", - " # Define the color schema \n", - " colors = [\"#75968f\", \"#a5bab7\", \"#c9d9d3\", \"#e2e2e2\", \"#dfccce\", \"#ddb7b1\", \"#cc7878\", \"#933b41\", \"#550b1d\"]\n", - " mapper = LinearColorMapper(palette=colors, low=0, high=2)\n", - "\n", - " TOOLS = \"hover, save, xpan, box_zoom, reset, xwheel_zoom\"\n", - " \n", - " # Create the plot \n", - " p = figure(title=\"Cluster Usage\", y_range=list(set(source.data['node_ip_address'])),\n", - " x_axis_location=\"above\", plot_width=900, plot_height=500,\n", - " tools=TOOLS, toolbar_location='below')\n", - "\n", - " # Format the plot axes \n", - " p.grid.grid_line_color = None\n", - " p.axis.axis_line_color = None\n", - " p.axis.major_tick_line_color = None\n", - " p.axis.major_label_text_font_size = \"10pt\"\n", - " p.axis.major_label_standoff = 0\n", - " p.xaxis.major_label_orientation = np.pi / 3\n", - "\n", - " # Plot rectangles\n", - " p.rect(x=\"time\", y=\"node_ip_address\", width=\"length\", height=1,\n", - " source=source,\n", - " fill_color={\"field\": \"num_tasks\", \"transform\": mapper},\n", - " line_color=None)\n", - "\n", - " # Add legend to the side of the plot\n", - " color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size=\"8pt\",\n", - " ticker=BasicTicker(desired_num_ticks=len(colors)),\n", - " label_standoff=6, border_line_color=None, location=(0, 0))\n", - " p.add_layout(color_bar, \"right\")\n", - "\n", - " # Define hover tool\n", - " p.select_one(HoverTool).tooltips = [\n", - " (\"Node IP Address\", \"@node_ip_address\"),\n", - " (\"Number of tasks running\", \"@num_tasks\"),\n", - " (\"Time\", \"@time\")\n", - " ]\n", - "\n", - " # Define the axis labels \n", - " p.xaxis.axis_label = \"Time in seconds\"\n", - " p.yaxis.axis_label = \"Node IP Address\"\n", - " handle = show(p, notebook_handle=True)\n", - " workers = ray.global_state.workers()\n", - " # Function to update the heat map \n", - " def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):\n", - " if len(tasks) == 0:\n", - " return\n", - " \n", - " granularity = 1\n", - " earliest = time.time()\n", - " latest = 0\n", - " \n", - " node_to_tasks = dict()\n", - " # Determine which task has the earlest start time out of the ones passed into the \n", - " # update function\n", - " for task_id, data in tasks.items():\n", - " if data[\"score\"] > latest:\n", - " latest = data[\"score\"]\n", - " if data[\"score\"] < earliest:\n", - " earliest = data[\"score\"]\n", - " worker_id = data[\"worker_id\"]\n", - " node_ip = workers[worker_id][\"node_ip_address\"]\n", - " if node_ip not in node_to_tasks: \n", - " node_to_tasks[node_ip] = {}\n", - " node_to_tasks[node_ip][task_id] = data\n", - " \n", - " nodes = []\n", - " times = []\n", - " lengths = []\n", - " num_tasks = []\n", - " \n", - " for node_ip, task_dict in node_to_tasks.items():\n", - " left, right, top = compute_utilizations(earliest, latest, abs_num_tasks, task_dict, 100, True)\n", - " for (l, r, t) in zip(left, right, top):\n", - " nodes.append(node_ip)\n", - " times.append((l + r) / 2)\n", - " lengths.append(r - l)\n", - " num_tasks.append(t)\n", - "\n", - " # Set the y range of the plot to be the node IP addresses \n", - " p.y_range.factors = list(set(nodes))\n", - " \n", - " mapper.low = min(min(num_tasks), 0)\n", - " mapper.high = max(max(num_tasks), 1)\n", - "\n", - " # Update plot with new data based on slider and text box values \n", - " source.data = {\"node_ip_address\": nodes, \"time\": times, \"num_tasks\": num_tasks, \"length\": lengths}\n", - " \n", - " push_notebook(handle=handle)\n", - "\n", - " get_sliders(heat_map_update)\n", - " \n", - "cluster_usage()" + "ui.cpu_usage()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Cluster usage." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ui.cluster_usage()" ] } ], diff --git a/python/ray/experimental/ui.py b/python/ray/experimental/ui.py new file mode 100644 index 000000000..a1016e79c --- /dev/null +++ b/python/ray/experimental/ui.py @@ -0,0 +1,722 @@ +import ipywidgets as widgets +import numpy as np +import os +import pprint +import ray +import subprocess +import tempfile +import time + +from IPython.display import display + +# Instances of this class maintains keep track of whether or not a +# callback is currently executing. Since the execution of the callback +# may trigger more calls to the callback, this is used to prevent infinite +# recursions. + + +class _EventRecursionContextManager(object): + def __init__(self): + self.should_recurse = True + + def __enter__(self): + self.should_recurse = False + + def __exit__(self, *args): + self.should_recurse = True + + +total_time_value = "% total time" +total_tasks_value = "% total tasks" + + +# Function that returns instances of sliders and handles associated events. + +def get_sliders(update): + # Start_box value indicates the desired start point of queried window. + start_box = widgets.FloatText( + description="Start Time:", + disabled=True, + ) + + # End_box value indicates the desired end point of queried window. + end_box = widgets.FloatText( + description="End Time:", + disabled=True, + ) + + # Percentage slider. Indicates either % of total time or total tasks + # depending on what breakdown_opt is set to. + range_slider = widgets.IntRangeSlider( + value=[70, 100], + min=0, + max=100, + step=1, + description="%:", + continuous_update=False, + orientation="horizontal", + readout=True, + readout_format=".0i%", + ) + + # Indicates the number of tasks that the user wants to be returned. Is + # disabled when the breakdown_opt value is set to total_time_value. + num_tasks_box = widgets.IntText( + description="Num Tasks:", + disabled=False + ) + + # Dropdown bar that lets the user choose between modifying % of total + # time or total number of tasks. + breakdown_opt = widgets.Dropdown( + options=[total_time_value, total_tasks_value], + value=total_tasks_value, + description="Selection Options:" + ) + + # Initially passed in to the update_wrapper function. + INIT_EVENT = "INIT" + + # Create instance of context manager to determine whether callback is + # currently executing + out_recursion = _EventRecursionContextManager() + + def update_wrapper(event): + # Feature received a callback, but it shouldn't be executed + # because the callback was the result of a different feature + # executing its callback based on user input. + if not out_recursion.should_recurse: + return + + # Feature received a callback and it should be executed because + # the callback was the result of user input. + with out_recursion: + smallest, largest, num_tasks = ray.global_state._job_length() + diff = largest - smallest + if num_tasks is not 0: + + # Describes the initial values that the slider/text box + # values should be set to. + if event == INIT_EVENT: + if breakdown_opt.value == total_tasks_value: + num_tasks_box.value = -min(10000, num_tasks) + range_slider.value = (int(100 - + (100. * -num_tasks_box.value) + / num_tasks), 100) + else: + low, high = map(lambda x: x / 100., range_slider.value) + start_box.value = round(diff * low, 2) + end_box.value = round(diff * high, 2) + + # Event was triggered by a change in the start_box value. + elif event["owner"] == start_box: + if start_box.value > end_box.value: + start_box.value = end_box.value + elif start_box.value < 0: + start_box.value = 0 + low, high = range_slider.value + range_slider.value = (int((start_box.value * 100.) + / diff), high) + + # Event was triggered by a change in the end_box value. + elif event["owner"] == end_box: + if start_box.value > end_box.value: + end_box.value = start_box.value + elif end_box.value > diff: + end_box.value = diff + low, high = range_slider.value + range_slider.value = (low, int((end_box.value * 100.) + / diff)) + + # Event was triggered by a change in the breakdown options + # toggle. + elif event["owner"] == breakdown_opt: + if breakdown_opt.value == total_tasks_value: + start_box.disabled = True + end_box.disabled = True + num_tasks_box.disabled = False + num_tasks_box.value = min(10000, num_tasks) + range_slider.value = (int(100 - + (100. * num_tasks_box.value) + / num_tasks), 100) + else: + start_box.disabled = False + end_box.disabled = False + num_tasks_box.disabled = True + range_slider.value = (int((start_box.value * 100.) + / diff), + int((end_box.value * 100.) + / diff)) + + # Event was triggered by a change in the range_slider + # value. + elif event["owner"] == range_slider: + low, high = map(lambda x: x / 100., range_slider.value) + if breakdown_opt.value == total_tasks_value: + old_low, old_high = event["old"] + new_low, new_high = event["new"] + if old_low != new_low: + range_slider.value = (new_low, 100) + num_tasks_box.value = (-(100. - new_low) + / 100. * num_tasks) + else: + range_slider.value = (0, new_high) + num_tasks_box.value = new_high / 100. * num_tasks + else: + start_box.value = round(diff * low, 2) + end_box.value = round(diff * high, 2) + + # Event was triggered by a change in the num_tasks_box + # value. + elif event["owner"] == num_tasks_box: + if num_tasks_box.value > 0: + range_slider.value = (0, int(100 * + float(num_tasks_box.value) + / num_tasks)) + elif num_tasks_box.value < 0: + range_slider.value = (100 + + int(100 * + float(num_tasks_box.value) + / num_tasks), 100) + + if not update: + return + + diff = largest - smallest + + # Low and high are used to scale the times that are + # queried to be relative to the absolute time. + low, high = map(lambda x: x / 100., range_slider.value) + + # Queries to task_profiles based on the slider and text + # box values. + # (Querying based on the % total amount of time.) + if breakdown_opt.value == total_time_value: + tasks = ray.global_state.task_profiles(start=(smallest + + diff * low), + end=(smallest + + diff * high)) + + # (Querying based on % of total number of tasks that were + # run.) + elif breakdown_opt.value == total_tasks_value: + if range_slider.value[0] == 0: + tasks = ray.global_state.task_profiles(num_tasks=(int( + num_tasks + * high)), + fwd=True) + else: + tasks = ray.global_state.task_profiles(num_tasks=(int( + num_tasks * + (high - low))), + fwd=False) + + update(smallest, largest, num_tasks, tasks) + + # Get updated values from a slider or text box, and update the rest of + # them accordingly. + range_slider.observe(update_wrapper, names="value") + breakdown_opt.observe(update_wrapper, names="value") + start_box.observe(update_wrapper, names="value") + end_box.observe(update_wrapper, names="value") + num_tasks_box.observe(update_wrapper, names="value") + + # Initializes the sliders + update_wrapper(INIT_EVENT) + + # Display sliders and search boxes + display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt) + + # Return the sliders and text boxes + return start_box, end_box, range_slider, breakdown_opt + + +def object_search_bar(): + object_search = widgets.Text( + value="", + placeholder="Object ID", + description="Search for an object:", + disabled=False + ) + display(object_search) + + def handle_submit(sender): + pp = pprint.PrettyPrinter() + pp.pprint(ray.global_state.object_table(object_search.value)) + + object_search.on_submit(handle_submit) + + +def task_search_bar(): + task_search = widgets.Text( + value="", + placeholder="Task ID", + description="Search for a task:", + disabled=False + ) + display(task_search) + + def handle_submit(sender): + pp = pprint.PrettyPrinter() + pp.pprint(ray.global_state.task_table(task_search.value)) + + task_search.on_submit(handle_submit) + + +def task_timeline(): + path_input = widgets.Button(description="View task timeline") + + breakdown_basic = "Basic" + breakdown_task = "Task Breakdowns" + + breakdown_opt = widgets.Dropdown( + options=["Basic", "Task Breakdowns"], + value="Basic", + description="View options:", + disabled=False, + ) + + start_box, end_box, range_slider, time_opt = get_sliders(False) + # display(breakdown_opt) + # display(path_input) + + def find_trace2html(): + trace2html = "/tmp/ray/catapult/tracing/bin/trace2html" + # Clone the catapult repository if it doesn't exist. TODO(rkn): We + # could do this in the build.sh script later on. + if not os.path.exists(trace2html): + cmd = ["git", + "clone", + "https://github.com/catapult-project/catapult.git", + "/tmp/ray/catapult"] + subprocess.check_output(cmd) + print("Cloning catapult to /tmp/ray/catapult.") + assert os.path.exists(trace2html) + return trace2html + + def handle_submit(sender): + tmp = tempfile.mktemp() + ".json" + tmp2 = tempfile.mktemp() + ".html" + + if breakdown_opt.value == breakdown_basic: + breakdown = False + elif breakdown_opt.value == breakdown_task: + breakdown = True + else: + raise ValueError( + "Unexpected breakdown value '{}'".format(breakdown_opt.value)) + + low, high = map(lambda x: x / 100., range_slider.value) + + smallest, largest, num_tasks = ray.global_state._job_length() + diff = largest - smallest + + if time_opt.value == total_time_value: + tasks = ray.global_state.task_profiles( + start=smallest + diff * low, + end=smallest + diff * high) + elif time_opt.value == total_tasks_value: + if range_slider.value[0] == 0: + tasks = ray.global_state.task_profiles( + num_tasks=int(num_tasks * high), + fwd=True) + else: + tasks = ray.global_state.task_profiles( + num_tasks=int(num_tasks * (high - low)), + fwd=False) + else: + raise ValueError( + "Unexpected time value '{}'".format(time_opt.value)) + + print("{} tasks to trace".format(len(tasks))) + print("Dumping task profiling data to " + tmp) + ray.global_state.dump_catapult_trace(tmp, + tasks, + breakdowns=breakdown) + print("Converting chrome trace to " + tmp2) + trace2html = find_trace2html() + # TODO(rkn): The trace2html script currently requires Python 2. + # Remove this dependency. + subprocess.check_output(["python2", + trace2html, + tmp, + "--output", + tmp2]) + # Open the timeline in Chrome. TODO(rkn): We should remove the + # dependency on Chrome and use whatever browser is currently being + # used. Note that this currently does not work when Ray is being + # used on a cluster and the browser is running locally. + print("Opening html file in browser...") + subprocess.Popen(["open", "-a", "Google Chrome", tmp2]) + + path_input.on_click(handle_submit) + + +def task_completion_time_distribution(): + from bokeh.models import ColumnDataSource + from bokeh.layouts import gridplot + from bokeh.plotting import figure, show, helpers + from bokeh.io import output_notebook, push_notebook + from bokeh.resources import CDN + output_notebook(resources=CDN) + + # Create the Bokeh plot + p = figure(title="Task Completion Time Distribution", + tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"], + background_fill_color="#FFFFFF", + x_range=(0, 1), + y_range=(0, 1)) + + # Create the data source that the plot pulls from + source = ColumnDataSource(data={ + "top": [], + "left": [], + "right": [] + }) + + # Plot the histogram rectangles + p.quad(top="top", bottom=0, left="left", right="right", source=source, + fill_color="#B3B3B3", line_color="#033649") + + # Label the plot axes + p.xaxis.axis_label = "Duration in seconds" + p.yaxis.axis_label = "Number of tasks" + + handle = show(gridplot(p, ncols=1, + plot_width=500, + plot_height=500, + toolbar_location="below"), notebook_handle=True) + + # Function to update the plot + def task_completion_time_update(abs_earliest, + abs_latest, + abs_num_tasks, + tasks): + if len(tasks) == 0: + return + + # Create the distribution to plot + distr = [] + for task_id, data in tasks.items(): + distr.append(data["store_outputs_end"] - + data["get_arguments_start"]) + + # Create a histogram from the distribution + top, bin_edges = np.histogram(distr, bins="auto") + left = bin_edges[:-1] + right = bin_edges[1:] + + source.data = {"top": top, "left": left, "right": right} + + # Set the x and y ranges + x_range = (min(left) if len(left) else 0, + max(right) if len(right) else 1) + y_range = (0, max(top) + 1 if len(top) else 1) + + x_range = helpers._get_range(x_range) + p.x_range.start = x_range.start + p.x_range.end = x_range.end + + y_range = helpers._get_range(y_range) + p.y_range.start = y_range.start + p.y_range.end = y_range.end + + # Push updates to the plot + push_notebook(handle=handle) + + get_sliders(task_completion_time_update) + + +def compute_utilizations(abs_earliest, + abs_latest, + num_tasks, + tasks, + num_buckets, + use_abs_times=False): + if len(tasks) == 0: + return [], [], [] + + if use_abs_times: + earliest_time = abs_earliest + latest_time = abs_latest + else: + # Determine what the earliest and latest tasks are out of the ones + # that are passed in + earliest_time = time.time() + latest_time = 0 + for task_id, data in tasks.items(): + latest_time = max((latest_time, data["store_outputs_end"])) + earliest_time = min((earliest_time, + data["get_arguments_start"])) + + # Add some epsilon to latest_time to ensure that the end time of the + # last task falls __within__ a bucket, and not on the edge + latest_time += 1e-6 + + # Compute average CPU utilization per time bucket by summing + # cpu-time per bucket + bucket_time_length = (latest_time - earliest_time) / float(num_buckets) + cpu_time = [0 for _ in range(num_buckets)] + + for data in tasks.values(): + task_start_time = data["get_arguments_start"] + task_end_time = data["store_outputs_end"] + + start_bucket = int((task_start_time - earliest_time) + / bucket_time_length) + end_bucket = int((task_end_time - earliest_time) + / bucket_time_length) + # Walk over each time bucket that this task intersects, adding the + # amount of time that the task intersects within each bucket + for bucket_idx in range(start_bucket, end_bucket + 1): + bucket_start_time = (earliest_time + bucket_idx + * bucket_time_length) + bucket_end_time = (earliest_time + (bucket_idx + 1) + * bucket_time_length) + + task_start_time_within_bucket = max(task_start_time, + bucket_start_time) + task_end_time_within_bucket = min(task_end_time, + bucket_end_time) + task_cpu_time_within_bucket = (task_end_time_within_bucket - + task_start_time_within_bucket) + + if bucket_idx > -1 and bucket_idx < num_buckets: + cpu_time[bucket_idx] += task_cpu_time_within_bucket + + # Cpu_utilization is the average cpu utilization of the bucket, which + # is just cpu_time divided by bucket_time_length. + cpu_utilization = list(map(lambda x: x / float(bucket_time_length), + cpu_time)) + + # Generate histogram bucket edges. Subtract out abs_earliest to get + # relative time. + all_edges = [earliest_time - abs_earliest + i * bucket_time_length + for i in range(num_buckets + 1)] + # Left edges are all but the rightmost edge, right edges are all but + # the leftmost edge. + left_edges = all_edges[:-1] + right_edges = all_edges[1:] + + return left_edges, right_edges, cpu_utilization + + +def cpu_usage(): + from bokeh.layouts import gridplot + from bokeh.plotting import figure, show, helpers + from bokeh.resources import CDN + from bokeh.io import output_notebook, push_notebook + from bokeh.models import ColumnDataSource + output_notebook(resources=CDN) + + # Parse the client table to determine how many CPUs are available + num_cpus = 0 + client_table = ray.global_state.client_table() + for node_ip, client_list in client_table.items(): + for client in client_list: + if "NumCPUs" in client: + num_cpus += client["NumCPUs"] + + # Update the plot based on the sliders + def plot_utilization(): + # Create the Bokeh plot + time_series_fig = figure(title="CPU Utilization", + tools=["save", "hover", "wheel_zoom", + "box_zoom", "pan"], + background_fill_color="#FFFFFF", + x_range=[0, 1], + y_range=[0, 1]) + + # Create the data source that the plot will pull from + time_series_source = ColumnDataSource(data=dict( + left=[], + right=[], + top=[] + )) + + # Plot the rectangles representing the distribution + time_series_fig.quad(left="left", + right="right", + top="top", + bottom=0, + source=time_series_source, + fill_color="#B3B3B3", + line_color="#033649") + + # Label the plot axes + time_series_fig.xaxis.axis_label = "Time in seconds" + time_series_fig.yaxis.axis_label = "Number of CPUs used" + + handle = show(gridplot(time_series_fig, + ncols=1, + plot_width=500, + plot_height=500, + toolbar_location="below"), notebook_handle=True) + + def update_plot(abs_earliest, abs_latest, abs_num_tasks, tasks): + num_buckets = 100 + left, right, top = compute_utilizations(abs_earliest, + abs_latest, + abs_num_tasks, + tasks, + num_buckets) + + time_series_source.data = {"left": left, + "right": right, + "top": top} + + x_range = (max(0, min(left)) + if len(left) else 0, + max(right) if len(right) else 1) + y_range = (0, max(top) + 1 if len(top) else 1) + + # Define the axis ranges + x_range = helpers._get_range(x_range) + time_series_fig.x_range.start = x_range.start + time_series_fig.x_range.end = x_range.end + + y_range = helpers._get_range(y_range) + time_series_fig.y_range.start = y_range.start + time_series_fig.y_range.end = num_cpus + + # Push the updated data to the notebook + push_notebook(handle=handle) + + get_sliders(update_plot) + plot_utilization() + + +# Function to create the cluster usage "heat map" +def cluster_usage(): + from bokeh.io import show, output_notebook, push_notebook + from bokeh.resources import CDN + from bokeh.plotting import figure + from bokeh.models import ( + ColumnDataSource, + HoverTool, + LinearColorMapper, + BasicTicker, + ColorBar, + ) + output_notebook(resources=CDN) + + # Initial values + source = ColumnDataSource(data={"node_ip_address": ['127.0.0.1'], + "time": ['0.5'], + "num_tasks": ['1'], + "length": [1]}) + + # Define the color schema + colors = ["#75968f", + "#a5bab7", + "#c9d9d3", + "#e2e2e2", + "#dfccce", + "#ddb7b1", + "#cc7878", + "#933b41", + "#550b1d"] + mapper = LinearColorMapper(palette=colors, low=0, high=2) + + TOOLS = "hover, save, xpan, box_zoom, reset, xwheel_zoom" + + # Create the plot + p = figure(title="Cluster Usage", + y_range=list(set(source.data['node_ip_address'])), + x_axis_location="above", + plot_width=900, + plot_height=500, + tools=TOOLS, + toolbar_location='below') + + # Format the plot axes + p.grid.grid_line_color = None + p.axis.axis_line_color = None + p.axis.major_tick_line_color = None + p.axis.major_label_text_font_size = "10pt" + p.axis.major_label_standoff = 0 + p.xaxis.major_label_orientation = np.pi / 3 + + # Plot rectangles + p.rect(x="time", y="node_ip_address", width="length", height=1, + source=source, + fill_color={"field": "num_tasks", "transform": mapper}, + line_color=None) + + # Add legend to the side of the plot + color_bar = ColorBar(color_mapper=mapper, + major_label_text_font_size="8pt", + ticker=BasicTicker(desired_num_ticks=len(colors)), + label_standoff=6, + border_line_color=None, + location=(0, 0)) + p.add_layout(color_bar, "right") + + # Define hover tool + p.select_one(HoverTool).tooltips = [ + ("Node IP Address", "@node_ip_address"), + ("Number of tasks running", "@num_tasks"), + ("Time", "@time") + ] + + # Define the axis labels + p.xaxis.axis_label = "Time in seconds" + p.yaxis.axis_label = "Node IP Address" + handle = show(p, notebook_handle=True) + workers = ray.global_state.workers() + + # Function to update the heat map + def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks): + if len(tasks) == 0: + return + + earliest = time.time() + latest = 0 + + node_to_tasks = dict() + # Determine which task has the earlest start time out of the ones + # passed into the update function + for task_id, data in tasks.items(): + if data["score"] > latest: + latest = data["score"] + if data["score"] < earliest: + earliest = data["score"] + worker_id = data["worker_id"] + node_ip = workers[worker_id]["node_ip_address"] + if node_ip not in node_to_tasks: + node_to_tasks[node_ip] = {} + node_to_tasks[node_ip][task_id] = data + + nodes = [] + times = [] + lengths = [] + num_tasks = [] + + for node_ip, task_dict in node_to_tasks.items(): + left, right, top = compute_utilizations(earliest, + latest, + abs_num_tasks, + task_dict, + 100, + True) + for (l, r, t) in zip(left, right, top): + nodes.append(node_ip) + times.append((l + r) / 2) + lengths.append(r - l) + num_tasks.append(t) + + # Set the y range of the plot to be the node IP addresses + p.y_range.factors = list(set(nodes)) + + mapper.low = min(min(num_tasks), 0) + mapper.high = max(max(num_tasks), 1) + + # Update plot with new data based on slider and text box values + source.data = {"node_ip_address": nodes, + "time": times, + "num_tasks": num_tasks, + "length": lengths} + + push_notebook(handle=handle) + + get_sliders(heat_map_update)